voltage_modbus/
server.rs

1/// Modbus server implementations
2/// 
3/// This module provides complete server-side implementations for both TCP and RTU protocols.
4
5use async_trait::async_trait;
6use std::net::SocketAddr;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::net::{TcpListener, TcpStream};
10use tokio::io::{AsyncReadExt, AsyncWriteExt};
11use tokio::sync::{broadcast, Mutex};
12use tokio::time::timeout;
13use log::{info, error, debug, warn};
14use tokio_serial;
15
16use crate::error::{ModbusError, ModbusResult};
17use crate::protocol::{ModbusRequest, ModbusResponse, ModbusFunction};
18use crate::register_bank::{ModbusRegisterBank, RegisterBankStats};
19
20/// Maximum frame size for Modbus TCP
21const MAX_TCP_FRAME_SIZE: usize = 260;
22
23/// MBAP header size
24const MBAP_HEADER_SIZE: usize = 6;
25
26/// Modbus server trait
27#[async_trait]
28pub trait ModbusServer: Send + Sync {
29    /// Start the server
30    async fn start(&mut self) -> ModbusResult<()>;
31    
32    /// Stop the server
33    async fn stop(&mut self) -> ModbusResult<()>;
34    
35    /// Check if server is running
36    fn is_running(&self) -> bool;
37    
38    /// Get server statistics
39    fn get_stats(&self) -> ServerStats;
40    
41    /// Get register bank reference
42    fn get_register_bank(&self) -> Option<Arc<ModbusRegisterBank>>;
43}
44
45/// Server statistics
46#[derive(Debug, Clone, Default)]
47pub struct ServerStats {
48    pub connections_count: u64,
49    pub total_requests: u64,
50    pub successful_requests: u64,
51    pub failed_requests: u64,
52    pub bytes_received: u64,
53    pub bytes_sent: u64,
54    pub uptime_seconds: u64,
55    pub register_bank_stats: Option<RegisterBankStats>,
56}
57
58
59/// Modbus TCP server configuration
60#[derive(Debug, Clone)]
61pub struct ModbusTcpServerConfig {
62    pub bind_address: SocketAddr,
63    pub max_connections: usize,
64    pub request_timeout: Duration,
65    pub register_bank: Option<Arc<ModbusRegisterBank>>,
66}
67
68impl Default for ModbusTcpServerConfig {
69    fn default() -> Self {
70        Self {
71            bind_address: "127.0.0.1:502".parse().unwrap(),
72            max_connections: 100,
73            request_timeout: Duration::from_secs(30),
74            register_bank: None,
75        }
76    }
77}
78
79/// Modbus TCP server implementation
80pub struct ModbusTcpServer {
81    config: ModbusTcpServerConfig,
82    register_bank: Arc<ModbusRegisterBank>,
83    stats: Arc<Mutex<ServerStats>>,
84    shutdown_tx: Option<broadcast::Sender<()>>,
85    is_running: Arc<Mutex<bool>>,
86    start_time: Option<std::time::Instant>,
87}
88
89impl ModbusTcpServer {
90    /// Create a new TCP server with default configuration
91    pub fn new(bind_address: &str) -> ModbusResult<Self> {
92        let addr = bind_address.parse()
93            .map_err(|e| ModbusError::invalid_data(format!("Invalid bind address: {}", e)))?;
94        
95        let config = ModbusTcpServerConfig {
96            bind_address: addr,
97            ..Default::default()
98        };
99        
100        Self::with_config(config)
101    }
102    
103    /// Create a new TCP server with custom configuration
104    pub fn with_config(config: ModbusTcpServerConfig) -> ModbusResult<Self> {
105        let register_bank = config.register_bank.clone()
106            .unwrap_or_else(|| Arc::new(ModbusRegisterBank::new()));
107        
108        Ok(Self {
109            config,
110            register_bank,
111            stats: Arc::new(Mutex::new(ServerStats::default())),
112            shutdown_tx: None,
113            is_running: Arc::new(Mutex::new(false)),
114            start_time: None,
115        })
116    }
117    
118    /// Set custom register bank
119    pub fn set_register_bank(&mut self, register_bank: Arc<ModbusRegisterBank>) {
120        self.register_bank = register_bank;
121    }
122    
123    /// Handle client connection
124    async fn handle_client(
125        stream: TcpStream,
126        register_bank: Arc<ModbusRegisterBank>,
127        stats: Arc<Mutex<ServerStats>>,
128        mut shutdown_rx: broadcast::Receiver<()>,
129        request_timeout: Duration,
130    ) {
131        let peer_addr = stream.peer_addr().unwrap_or_else(|_| "unknown".parse().unwrap());
132        info!("📡 New client connected: {}", peer_addr);
133        
134        // Update connection count
135        {
136            let mut stats = stats.lock().await;
137            stats.connections_count += 1;
138        }
139        
140        let mut stream = stream;
141        let mut buffer = vec![0u8; MAX_TCP_FRAME_SIZE];
142        
143        loop {
144            tokio::select! {
145                // Handle shutdown signal
146                _ = shutdown_rx.recv() => {
147                    debug!("Shutdown signal received for client {}", peer_addr);
148                    break;
149                }
150                
151                // Handle client request
152                result = timeout(request_timeout, stream.read(&mut buffer)) => {
153                    match result {
154                        Ok(Ok(0)) => {
155                            debug!("Client {} disconnected", peer_addr);
156                            break;
157                        }
158                        Ok(Ok(bytes_read)) => {
159                            // Update stats
160                            {
161                                let mut stats = stats.lock().await;
162                                stats.total_requests += 1;
163                                stats.bytes_received += bytes_read as u64;
164                            }
165                            
166                            // Process request
167                            match Self::process_request(&buffer[..bytes_read], &register_bank).await {
168                                Ok(response_data) => {
169                                    if let Err(e) = stream.write_all(&response_data).await {
170                                        error!("Failed to send response to {}: {}", peer_addr, e);
171                                        break;
172                                    } else {
173                                        // Update success stats
174                                        let mut stats = stats.lock().await;
175                                        stats.successful_requests += 1;
176                                        stats.bytes_sent += response_data.len() as u64;
177                                    }
178                                }
179                                Err(e) => {
180                                    error!("Error processing request from {}: {}", peer_addr, e);
181                                    
182                                    // Send error response if possible
183                                    if let Ok(error_response) = Self::create_error_response(&buffer[..bytes_read], 0x01) {
184                                        let _ = stream.write_all(&error_response).await;
185                                    }
186                                    
187                                    // Update error stats
188                                    let mut stats = stats.lock().await;
189                                    stats.failed_requests += 1;
190                                }
191                            }
192                        }
193                        Ok(Err(e)) => {
194                            error!("Read error from {}: {}", peer_addr, e);
195                            break;
196                        }
197                        Err(_) => {
198                            warn!("Read timeout from {}", peer_addr);
199                            break;
200                        }
201                    }
202                }
203            }
204        }
205        
206        info!("🔌 Client {} disconnected", peer_addr);
207    }
208    
209    /// Process Modbus request
210    async fn process_request(
211        frame: &[u8],
212        register_bank: &Arc<ModbusRegisterBank>,
213    ) -> ModbusResult<Vec<u8>> {
214        if frame.len() < MBAP_HEADER_SIZE + 2 {
215            return Err(ModbusError::frame("Frame too short"));
216        }
217        
218        // Parse MBAP header
219        let transaction_id = u16::from_be_bytes([frame[0], frame[1]]);
220        let protocol_id = u16::from_be_bytes([frame[2], frame[3]]);
221        let length = u16::from_be_bytes([frame[4], frame[5]]);
222        let unit_id = frame[6];
223        let function_code = frame[7];
224        
225        if protocol_id != 0 {
226            return Err(ModbusError::frame("Invalid protocol ID"));
227        }
228        
229        if frame.len() < MBAP_HEADER_SIZE + length as usize {
230            return Err(ModbusError::frame("Incomplete frame"));
231        }
232        
233        debug!("Processing request: TID={}, Function=0x{:02x}, Unit={}", 
234               transaction_id, function_code, unit_id);
235        
236        // Parse function and data
237        let data = &frame[MBAP_HEADER_SIZE + 2..MBAP_HEADER_SIZE + length as usize];
238        
239        // Process based on function code
240        let response_data = match function_code {
241            0x01 => Self::handle_read_coils(data, register_bank).await?,
242            0x02 => Self::handle_read_discrete_inputs(data, register_bank).await?,
243            0x03 => Self::handle_read_holding_registers(data, register_bank).await?,
244            0x04 => Self::handle_read_input_registers(data, register_bank).await?,
245            0x05 => Self::handle_write_single_coil(data, register_bank).await?,
246            0x06 => Self::handle_write_single_register(data, register_bank).await?,
247            0x0F => Self::handle_write_multiple_coils(data, register_bank).await?,
248            0x10 => Self::handle_write_multiple_registers(data, register_bank).await?,
249            _ => {
250                return Err(ModbusError::protocol(format!("Unsupported function code: 0x{:02x}", function_code)));
251            }
252        };
253        
254        // Create response frame
255        let response_length = response_data.len() + 2; // +2 for unit_id and function_code
256        let mut response = Vec::with_capacity(MBAP_HEADER_SIZE + response_length);
257        
258        // MBAP header
259        response.extend_from_slice(&transaction_id.to_be_bytes());
260        response.extend_from_slice(&protocol_id.to_be_bytes());
261        response.extend_from_slice(&(response_length as u16).to_be_bytes());
262        
263        // PDU
264        response.push(unit_id);
265        response.push(function_code);
266        response.extend_from_slice(&response_data);
267        
268        Ok(response)
269    }
270    
271    /// Handle read coils (0x01)
272    async fn handle_read_coils(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
273        if data.len() < 4 {
274            return Err(ModbusError::frame("Invalid read coils request"));
275        }
276        
277        let address = u16::from_be_bytes([data[0], data[1]]);
278        let quantity = u16::from_be_bytes([data[2], data[3]]);
279        
280        if quantity == 0 || quantity > 2000 {
281            return Err(ModbusError::invalid_data("Invalid quantity"));
282        }
283        
284        let coils = register_bank.read_coils(address, quantity)?;
285        
286        // Pack bits into bytes
287        let byte_count = (quantity + 7) / 8;
288        let mut response = vec![byte_count as u8];
289        
290        for i in 0..byte_count {
291            let mut byte_value = 0u8;
292            for bit in 0..8 {
293                let coil_index = (i * 8 + bit) as usize;
294                if coil_index < coils.len() && coils[coil_index] {
295                    byte_value |= 1 << bit;
296                }
297            }
298            response.push(byte_value);
299        }
300        
301        Ok(response)
302    }
303    
304    /// Handle read discrete inputs (0x02)
305    async fn handle_read_discrete_inputs(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
306        if data.len() < 4 {
307            return Err(ModbusError::frame("Invalid read discrete inputs request"));
308        }
309        
310        let address = u16::from_be_bytes([data[0], data[1]]);
311        let quantity = u16::from_be_bytes([data[2], data[3]]);
312        
313        if quantity == 0 || quantity > 2000 {
314            return Err(ModbusError::invalid_data("Invalid quantity"));
315        }
316        
317        let inputs = register_bank.read_discrete_inputs(address, quantity)?;
318        
319        // Pack bits into bytes
320        let byte_count = (quantity + 7) / 8;
321        let mut response = vec![byte_count as u8];
322        
323        for i in 0..byte_count {
324            let mut byte_value = 0u8;
325            for bit in 0..8 {
326                let input_index = (i * 8 + bit) as usize;
327                if input_index < inputs.len() && inputs[input_index] {
328                    byte_value |= 1 << bit;
329                }
330            }
331            response.push(byte_value);
332        }
333        
334        Ok(response)
335    }
336    
337    /// Handle read holding registers (0x03)
338    async fn handle_read_holding_registers(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
339        if data.len() < 4 {
340            return Err(ModbusError::frame("Invalid read holding registers request"));
341        }
342        
343        let address = u16::from_be_bytes([data[0], data[1]]);
344        let quantity = u16::from_be_bytes([data[2], data[3]]);
345        
346        if quantity == 0 || quantity > 125 {
347            return Err(ModbusError::invalid_data("Invalid quantity"));
348        }
349        
350        let registers = register_bank.read_holding_registers(address, quantity)?;
351        
352        let byte_count = quantity * 2;
353        let mut response = vec![byte_count as u8];
354        
355        for register in registers {
356            response.extend_from_slice(&register.to_be_bytes());
357        }
358        
359        Ok(response)
360    }
361    
362    /// Handle read input registers (0x04)
363    async fn handle_read_input_registers(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
364        if data.len() < 4 {
365            return Err(ModbusError::frame("Invalid read input registers request"));
366        }
367        
368        let address = u16::from_be_bytes([data[0], data[1]]);
369        let quantity = u16::from_be_bytes([data[2], data[3]]);
370        
371        if quantity == 0 || quantity > 125 {
372            return Err(ModbusError::invalid_data("Invalid quantity"));
373        }
374        
375        let registers = register_bank.read_input_registers(address, quantity)?;
376        
377        let byte_count = quantity * 2;
378        let mut response = vec![byte_count as u8];
379        
380        for register in registers {
381            response.extend_from_slice(&register.to_be_bytes());
382        }
383        
384        Ok(response)
385    }
386    
387    /// Handle write single coil (0x05)
388    async fn handle_write_single_coil(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
389        if data.len() < 4 {
390            return Err(ModbusError::frame("Invalid write single coil request"));
391        }
392        
393        let address = u16::from_be_bytes([data[0], data[1]]);
394        let value = u16::from_be_bytes([data[2], data[3]]);
395        
396        let coil_value = match value {
397            0x0000 => false,
398            0xFF00 => true,
399            _ => return Err(ModbusError::invalid_data("Invalid coil value")),
400        };
401        
402        register_bank.write_single_coil(address, coil_value)?;
403        
404        // Echo back the request
405        Ok(data.to_vec())
406    }
407    
408    /// Handle write single register (0x06)
409    async fn handle_write_single_register(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
410        if data.len() < 4 {
411            return Err(ModbusError::frame("Invalid write single register request"));
412        }
413        
414        let address = u16::from_be_bytes([data[0], data[1]]);
415        let value = u16::from_be_bytes([data[2], data[3]]);
416        
417        register_bank.write_single_register(address, value)?;
418        
419        // Echo back the request
420        Ok(data.to_vec())
421    }
422    
423    /// Handle write multiple coils (0x0F)
424    async fn handle_write_multiple_coils(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
425        if data.len() < 5 {
426            return Err(ModbusError::frame("Invalid write multiple coils request"));
427        }
428        
429        let address = u16::from_be_bytes([data[0], data[1]]);
430        let quantity = u16::from_be_bytes([data[2], data[3]]);
431        let byte_count = data[4] as usize;
432        
433        if data.len() < 5 + byte_count {
434            return Err(ModbusError::frame("Incomplete write multiple coils request"));
435        }
436        
437        // Extract coil values from bytes
438        let mut coils = Vec::with_capacity(quantity as usize);
439        for i in 0..quantity {
440            let byte_index = (i / 8) as usize;
441            let bit_index = i % 8;
442            let byte_value = data[5 + byte_index];
443            let coil_value = (byte_value & (1 << bit_index)) != 0;
444            coils.push(coil_value);
445        }
446        
447        register_bank.write_multiple_coils(address, &coils)?;
448        
449        // Return address and quantity
450        Ok(data[0..4].to_vec())
451    }
452    
453    /// Handle write multiple registers (0x10)
454    async fn handle_write_multiple_registers(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
455        if data.len() < 5 {
456            return Err(ModbusError::frame("Invalid write multiple registers request"));
457        }
458        
459        let address = u16::from_be_bytes([data[0], data[1]]);
460        let quantity = u16::from_be_bytes([data[2], data[3]]);
461        let byte_count = data[4] as usize;
462        
463        if data.len() < 5 + byte_count || byte_count != (quantity as usize * 2) {
464            return Err(ModbusError::frame("Incomplete write multiple registers request"));
465        }
466        
467        // Extract register values
468        let mut registers = Vec::with_capacity(quantity as usize);
469        for i in 0..quantity {
470            let offset = 5 + (i as usize * 2);
471            let value = u16::from_be_bytes([data[offset], data[offset + 1]]);
472            registers.push(value);
473        }
474        
475        register_bank.write_multiple_registers(address, &registers)?;
476        
477        // Return address and quantity
478        Ok(data[0..4].to_vec())
479    }
480    
481    /// Create error response
482    fn create_error_response(request: &[u8], exception_code: u8) -> ModbusResult<Vec<u8>> {
483        if request.len() < MBAP_HEADER_SIZE + 2 {
484            return Err(ModbusError::frame("Request too short for error response"));
485        }
486        
487        let transaction_id = u16::from_be_bytes([request[0], request[1]]);
488        let protocol_id = 0u16;
489        let length = 3u16; // unit_id + function_code + exception_code
490        let unit_id = request[6];
491        let function_code = request[7] | 0x80; // Set exception bit
492        
493        let mut response = Vec::with_capacity(MBAP_HEADER_SIZE + 3);
494        
495        // MBAP header
496        response.extend_from_slice(&transaction_id.to_be_bytes());
497        response.extend_from_slice(&protocol_id.to_be_bytes());
498        response.extend_from_slice(&length.to_be_bytes());
499        
500        // Exception PDU
501        response.push(unit_id);
502        response.push(function_code);
503        response.push(exception_code);
504        
505        Ok(response)
506    }
507}
508
509#[async_trait]
510impl ModbusServer for ModbusTcpServer {
511    async fn start(&mut self) -> ModbusResult<()> {
512        let mut is_running = self.is_running.lock().await;
513        if *is_running {
514            return Err(ModbusError::protocol("Server is already running"));
515        }
516        
517        info!("🚀 Starting Modbus TCP server on {}", self.config.bind_address);
518        
519        let listener = TcpListener::bind(self.config.bind_address).await
520            .map_err(|e| ModbusError::connection(format!("Failed to bind to {}: {}", self.config.bind_address, e)))?;
521        
522        let (shutdown_tx, _) = broadcast::channel(1);
523        self.shutdown_tx = Some(shutdown_tx.clone());
524        self.start_time = Some(std::time::Instant::now());
525        
526        *is_running = true;
527        drop(is_running);
528        
529        info!("✅ Modbus TCP server started successfully");
530        info!("📊 Server configuration:");
531        info!("   - Bind address: {}", self.config.bind_address);
532        info!("   - Max connections: {}", self.config.max_connections);
533        info!("   - Request timeout: {:?}", self.config.request_timeout);
534        
535        let register_bank = self.register_bank.clone();
536        let stats = self.stats.clone();
537        let request_timeout = self.config.request_timeout;
538        let is_running_flag = self.is_running.clone();
539        let mut shutdown_rx = shutdown_tx.subscribe();
540        
541        tokio::spawn(async move {
542            loop {
543                tokio::select! {
544                    result = listener.accept() => {
545                        match result {
546                            Ok((stream, addr)) => {
547                                debug!("Accepted connection from {}", addr);
548                                
549                                let register_bank = register_bank.clone();
550                                let stats = stats.clone();
551                                let shutdown_rx = shutdown_tx.subscribe();
552                                
553                                tokio::spawn(async move {
554                                    Self::handle_client(stream, register_bank, stats, shutdown_rx, request_timeout).await;
555                                });
556                            }
557                            Err(e) => {
558                                error!("Failed to accept connection: {}", e);
559                            }
560                        }
561                    }
562                    _ = shutdown_rx.recv() => {
563                        info!("Shutdown signal received, stopping server");
564                        break;
565                    }
566                }
567            }
568            
569            let mut is_running = is_running_flag.lock().await;
570            *is_running = false;
571        });
572        
573        Ok(())
574    }
575    
576    async fn stop(&mut self) -> ModbusResult<()> {
577        if let Some(shutdown_tx) = &self.shutdown_tx {
578            let _ = shutdown_tx.send(());
579        }
580        
581        let mut is_running = self.is_running.lock().await;
582        *is_running = false;
583        
584        info!("âšī¸  Modbus TCP server stopped");
585        Ok(())
586    }
587    
588    fn is_running(&self) -> bool {
589        // Note: This is a synchronous method, so we can't use async lock
590        // In a real implementation, you might want to use a different approach
591        false // Placeholder
592    }
593    
594    fn get_stats(&self) -> ServerStats {
595        // Note: This is a synchronous method, so we can't use async lock
596        // In a real implementation, you might want to use a different approach
597        let mut stats = ServerStats::default();
598        
599        if let Some(start_time) = self.start_time {
600            stats.uptime_seconds = start_time.elapsed().as_secs();
601        }
602        
603        stats.register_bank_stats = Some(self.register_bank.get_stats());
604        stats
605    }
606    
607    fn get_register_bank(&self) -> Option<Arc<ModbusRegisterBank>> {
608        Some(self.register_bank.clone())
609    }
610}
611
612/// Modbus RTU server configuration
613#[derive(Debug, Clone)]
614pub struct ModbusRtuServerConfig {
615    pub port: String,
616    pub baud_rate: u32,
617    pub data_bits: tokio_serial::DataBits,
618    pub stop_bits: tokio_serial::StopBits,
619    pub parity: tokio_serial::Parity,
620    pub timeout: Duration,
621    pub frame_gap: Duration,
622    pub register_bank: Option<Arc<ModbusRegisterBank>>,
623}
624
625impl Default for ModbusRtuServerConfig {
626    fn default() -> Self {
627        Self {
628            port: "/dev/ttyUSB0".to_string(),
629            baud_rate: 9600,
630            data_bits: tokio_serial::DataBits::Eight,
631            stop_bits: tokio_serial::StopBits::One,
632            parity: tokio_serial::Parity::None,
633            timeout: Duration::from_secs(1),
634            frame_gap: Duration::from_millis(4), // Default 3.5 char time at 9600 baud
635            register_bank: None,
636        }
637    }
638}
639
640/// Modbus RTU server implementation
641pub struct ModbusRtuServer {
642    config: ModbusRtuServerConfig,
643    register_bank: Arc<ModbusRegisterBank>,
644    stats: Arc<Mutex<ServerStats>>,
645    shutdown_tx: Option<broadcast::Sender<()>>,
646    is_running: Arc<Mutex<bool>>,
647    start_time: Option<std::time::Instant>,
648}
649
650impl ModbusRtuServer {
651    /// Create a new RTU server with default configuration
652    pub fn new(port: &str, baud_rate: u32) -> ModbusResult<Self> {
653        let config = ModbusRtuServerConfig {
654            port: port.to_string(),
655            baud_rate,
656            ..Default::default()
657        };
658        
659        Self::with_config(config)
660    }
661    
662    /// Create a new RTU server with custom configuration
663    pub fn with_config(config: ModbusRtuServerConfig) -> ModbusResult<Self> {
664        let register_bank = config.register_bank.clone()
665            .unwrap_or_else(|| Arc::new(ModbusRegisterBank::new()));
666        
667        Ok(Self {
668            config,
669            register_bank,
670            stats: Arc::new(Mutex::new(ServerStats::default())),
671            shutdown_tx: None,
672            is_running: Arc::new(Mutex::new(false)),
673            start_time: None,
674        })
675    }
676    
677    /// Set custom register bank
678    pub fn set_register_bank(&mut self, register_bank: Arc<ModbusRegisterBank>) {
679        self.register_bank = register_bank;
680    }
681    
682    /// Calculate CRC for RTU frames
683    fn calculate_crc(data: &[u8]) -> u16 {
684        use crc::{Crc, CRC_16_MODBUS};
685        const CRC_MODBUS: Crc<u16> = Crc::<u16>::new(&CRC_16_MODBUS);
686        CRC_MODBUS.checksum(data)
687    }
688    
689    /// Process RTU frame
690    async fn process_rtu_frame(
691        frame: &[u8],
692        register_bank: &Arc<ModbusRegisterBank>,
693    ) -> ModbusResult<Vec<u8>> {
694        if frame.len() < 4 {
695            return Err(ModbusError::frame("RTU frame too short"));
696        }
697        
698        // Extract CRC from frame
699        let data_len = frame.len() - 2;
700        let data = &frame[..data_len];
701        let received_crc = u16::from_le_bytes([frame[data_len], frame[data_len + 1]]);
702        
703        // Verify CRC
704        let calculated_crc = Self::calculate_crc(data);
705        if received_crc != calculated_crc {
706            return Err(ModbusError::frame("Invalid CRC"));
707        }
708        
709        if data.len() < 2 {
710            return Err(ModbusError::frame("Invalid RTU frame"));
711        }
712        
713        let slave_id = data[0];
714        let function_code = data[1];
715        let pdu_data = &data[2..];
716        
717        debug!("Processing RTU request: Slave={}, Function=0x{:02x}", slave_id, function_code);
718        
719        // Process based on function code
720        let response_data = match function_code {
721            0x01 => Self::handle_read_coils(pdu_data, register_bank).await?,
722            0x02 => Self::handle_read_discrete_inputs(pdu_data, register_bank).await?,
723            0x03 => Self::handle_read_holding_registers(pdu_data, register_bank).await?,
724            0x04 => Self::handle_read_input_registers(pdu_data, register_bank).await?,
725            0x05 => Self::handle_write_single_coil(pdu_data, register_bank).await?,
726            0x06 => Self::handle_write_single_register(pdu_data, register_bank).await?,
727            0x0F => Self::handle_write_multiple_coils(pdu_data, register_bank).await?,
728            0x10 => Self::handle_write_multiple_registers(pdu_data, register_bank).await?,
729            _ => {
730                return Self::create_rtu_error_response(slave_id, function_code, 0x01);
731            }
732        };
733        
734        // Create response frame
735        let mut response = Vec::new();
736        response.push(slave_id);
737        response.push(function_code);
738        response.extend_from_slice(&response_data);
739        
740        // Add CRC
741        let crc = Self::calculate_crc(&response);
742        response.extend_from_slice(&crc.to_le_bytes());
743        
744        Ok(response)
745    }
746    
747    /// Create RTU error response
748    fn create_rtu_error_response(slave_id: u8, function_code: u8, exception_code: u8) -> ModbusResult<Vec<u8>> {
749        let mut response = Vec::new();
750        response.push(slave_id);
751        response.push(function_code | 0x80); // Set exception bit
752        response.push(exception_code);
753        
754        let crc = Self::calculate_crc(&response);
755        response.extend_from_slice(&crc.to_le_bytes());
756        
757        Ok(response)
758    }
759    
760    /// Handle RTU communication loop
761    async fn handle_rtu_communication(
762        mut port: tokio_serial::SerialStream,
763        register_bank: Arc<ModbusRegisterBank>,
764        stats: Arc<Mutex<ServerStats>>,
765        mut shutdown_rx: broadcast::Receiver<()>,
766        frame_gap: Duration,
767    ) {
768        info!("🔌 RTU server communication started");
769        
770        let mut buffer = vec![0u8; 256];
771        let mut frame_buffer = Vec::new();
772        let mut last_activity = std::time::Instant::now();
773        
774        loop {
775            tokio::select! {
776                _ = shutdown_rx.recv() => {
777                    debug!("Shutdown signal received for RTU server");
778                    break;
779                }
780                
781                result = tokio::time::timeout(Duration::from_millis(100), port.read(&mut buffer)) => {
782                    match result {
783                        Ok(Ok(bytes_read)) if bytes_read > 0 => {
784                            let now = std::time::Instant::now();
785                            
786                            // Check for frame gap
787                            if now.duration_since(last_activity) > frame_gap && !frame_buffer.is_empty() {
788                                // Process accumulated frame
789                                Self::process_accumulated_frame(
790                                    &frame_buffer,
791                                    &mut port,
792                                    &register_bank,
793                                    &stats
794                                ).await;
795                                frame_buffer.clear();
796                            }
797                            
798                            // Accumulate data
799                            frame_buffer.extend_from_slice(&buffer[..bytes_read]);
800                            last_activity = now;
801                            
802                            // Update stats
803                            {
804                                let mut stats = stats.lock().await;
805                                stats.bytes_received += bytes_read as u64;
806                            }
807                        }
808                        Ok(Ok(_)) => {
809                            // No data read, but successful read operation
810                        }
811                        Ok(Err(e)) => {
812                            error!("RTU read error: {}", e);
813                            break;
814                        }
815                        Err(_) => {
816                            // Timeout - check if we have a complete frame
817                            let now = std::time::Instant::now();
818                            if !frame_buffer.is_empty() && now.duration_since(last_activity) > frame_gap {
819                                Self::process_accumulated_frame(
820                                    &frame_buffer,
821                                    &mut port,
822                                    &register_bank,
823                                    &stats
824                                ).await;
825                                frame_buffer.clear();
826                            }
827                        }
828                    }
829                }
830            }
831        }
832        
833        info!("🔌 RTU server communication stopped");
834    }
835    
836    /// Process accumulated frame data
837    async fn process_accumulated_frame(
838        frame: &[u8],
839        port: &mut tokio_serial::SerialStream,
840        register_bank: &Arc<ModbusRegisterBank>,
841        stats: &Arc<Mutex<ServerStats>>,
842    ) {
843        // Update request stats
844        {
845            let mut stats = stats.lock().await;
846            stats.total_requests += 1;
847        }
848        
849        match Self::process_rtu_frame(frame, register_bank).await {
850            Ok(response) => {
851                if let Err(e) = port.write_all(&response).await {
852                    error!("Failed to send RTU response: {}", e);
853                    let mut stats = stats.lock().await;
854                    stats.failed_requests += 1;
855                } else {
856                    let mut stats = stats.lock().await;
857                    stats.successful_requests += 1;
858                    stats.bytes_sent += response.len() as u64;
859                }
860            }
861            Err(e) => {
862                error!("Error processing RTU request: {}", e);
863                
864                // Try to send error response if we can determine slave ID and function
865                if frame.len() >= 2 {
866                    if let Ok(error_response) = Self::create_rtu_error_response(frame[0], frame[1], 0x01) {
867                        let _ = port.write_all(&error_response).await;
868                    }
869                }
870                
871                let mut stats = stats.lock().await;
872                stats.failed_requests += 1;
873            }
874        }
875    }
876    
877    // Reuse the same handler methods from TCP server
878    async fn handle_read_coils(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
879        ModbusTcpServer::handle_read_coils(data, register_bank).await
880    }
881    
882    async fn handle_read_discrete_inputs(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
883        ModbusTcpServer::handle_read_discrete_inputs(data, register_bank).await
884    }
885    
886    async fn handle_read_holding_registers(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
887        ModbusTcpServer::handle_read_holding_registers(data, register_bank).await
888    }
889    
890    async fn handle_read_input_registers(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
891        ModbusTcpServer::handle_read_input_registers(data, register_bank).await
892    }
893    
894    async fn handle_write_single_coil(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
895        ModbusTcpServer::handle_write_single_coil(data, register_bank).await
896    }
897    
898    async fn handle_write_single_register(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
899        ModbusTcpServer::handle_write_single_register(data, register_bank).await
900    }
901    
902    async fn handle_write_multiple_coils(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
903        ModbusTcpServer::handle_write_multiple_coils(data, register_bank).await
904    }
905    
906    async fn handle_write_multiple_registers(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
907        ModbusTcpServer::handle_write_multiple_registers(data, register_bank).await
908    }
909}
910
911/// Modbus RTU server implementation
912/// 
913/// Note: This is a placeholder for future implementation
914#[async_trait]
915impl ModbusServer for ModbusRtuServer {
916    async fn start(&mut self) -> ModbusResult<()> {
917        let mut is_running = self.is_running.lock().await;
918        if *is_running {
919            return Err(ModbusError::protocol("RTU Server is already running"));
920        }
921        
922        info!("🚀 Starting Modbus RTU server on {}", self.config.port);
923        
924        // Create serial port connection
925        let port = tokio_serial::SerialStream::open(&tokio_serial::new(&self.config.port, self.config.baud_rate)
926            .data_bits(self.config.data_bits)
927            .stop_bits(self.config.stop_bits)
928            .parity(self.config.parity)
929            .timeout(self.config.timeout))
930            .map_err(|e| ModbusError::connection(format!("Failed to open serial port {}: {}", self.config.port, e)))?;
931        
932        let (shutdown_tx, _) = broadcast::channel(1);
933        self.shutdown_tx = Some(shutdown_tx.clone());
934        self.start_time = Some(std::time::Instant::now());
935        
936        *is_running = true;
937        drop(is_running);
938        
939        info!("✅ Modbus RTU server started successfully");
940        info!("📊 Server configuration:");
941        info!("   - Port: {}", self.config.port);
942        info!("   - Baud rate: {}", self.config.baud_rate);
943        info!("   - Data bits: {:?}", self.config.data_bits);
944        info!("   - Stop bits: {:?}", self.config.stop_bits);
945        info!("   - Parity: {:?}", self.config.parity);
946        info!("   - Timeout: {:?}", self.config.timeout);
947        
948        let register_bank = self.register_bank.clone();
949        let stats = self.stats.clone();
950        let frame_gap = self.config.frame_gap;
951        let is_running_flag = self.is_running.clone();
952        let shutdown_rx = shutdown_tx.subscribe();
953        
954        tokio::spawn(async move {
955            Self::handle_rtu_communication(port, register_bank, stats, shutdown_rx, frame_gap).await;
956            
957            let mut is_running = is_running_flag.lock().await;
958            *is_running = false;
959        });
960        
961        Ok(())
962    }
963    
964    async fn stop(&mut self) -> ModbusResult<()> {
965        if let Some(shutdown_tx) = &self.shutdown_tx {
966            let _ = shutdown_tx.send(());
967        }
968        
969        let mut is_running = self.is_running.lock().await;
970        *is_running = false;
971        
972        info!("âšī¸  Modbus RTU server stopped");
973        Ok(())
974    }
975    
976    fn is_running(&self) -> bool {
977        // Note: This is a synchronous method, so we can't use async lock
978        // In a real implementation, you might want to use a different approach
979        false // Placeholder
980    }
981    
982    fn get_stats(&self) -> ServerStats {
983        // Note: This is a synchronous method, so we can't use async lock
984        // In a real implementation, you might want to use a different approach
985        let mut stats = ServerStats::default();
986        
987        if let Some(start_time) = self.start_time {
988            stats.uptime_seconds = start_time.elapsed().as_secs();
989        }
990        
991        stats.register_bank_stats = Some(self.register_bank.get_stats());
992        stats
993    }
994    
995    fn get_register_bank(&self) -> Option<Arc<ModbusRegisterBank>> {
996        Some(self.register_bank.clone())
997    }
998}
999
1000#[cfg(test)]
1001mod tests {
1002    use super::*;
1003    use std::time::Duration;
1004    
1005    #[test]
1006    fn test_tcp_server_creation() {
1007        // Test TCP server creation
1008        let result = ModbusTcpServer::new("127.0.0.1:5020");
1009        assert!(result.is_ok());
1010        
1011        let server = result.unwrap();
1012        assert!(!server.is_running());
1013        assert!(server.get_register_bank().is_some());
1014    }
1015    
1016    #[test]
1017    fn test_rtu_server_creation() {
1018        // Test RTU server creation
1019        let result = ModbusRtuServer::new("/dev/ttyUSB0", 9600);
1020        assert!(result.is_ok());
1021        
1022        let server = result.unwrap();
1023        assert!(!server.is_running());
1024        assert!(server.get_register_bank().is_some());
1025    }
1026    
1027    #[test]
1028    fn test_rtu_server_configuration() {
1029        // Test RTU server with custom configuration
1030        let config = ModbusRtuServerConfig {
1031            port: "/dev/ttyUSB0".to_string(),
1032            baud_rate: 19200,
1033            data_bits: tokio_serial::DataBits::Eight,
1034            stop_bits: tokio_serial::StopBits::Two,
1035            parity: tokio_serial::Parity::Even,
1036            timeout: Duration::from_secs(2),
1037            frame_gap: Duration::from_millis(5),
1038            register_bank: None,
1039        };
1040        
1041        let result = ModbusRtuServer::with_config(config);
1042        assert!(result.is_ok());
1043        
1044        let server = result.unwrap();
1045        assert!(!server.is_running());
1046    }
1047    
1048    #[tokio::test]
1049    async fn test_rtu_server_lifecycle() {
1050        // Test RTU server start/stop lifecycle
1051        let mut server = ModbusRtuServer::new("/dev/ttyUSB0", 9600).unwrap();
1052        
1053        // Server should not be running initially
1054        assert!(!server.is_running());
1055        
1056        // Try to start server (will fail without actual serial port)
1057        let start_result = server.start().await;
1058        
1059        if start_result.is_ok() {
1060            // If start succeeded (unlikely without hardware), test stop
1061            tokio::time::sleep(Duration::from_millis(10)).await;
1062            let stop_result = server.stop().await;
1063            assert!(stop_result.is_ok());
1064        } else {
1065            // Expected to fail without actual hardware
1066            println!("RTU server start failed (expected without serial port): {:?}", start_result.err());
1067        }
1068    }
1069    
1070    #[test]
1071    fn test_crc_calculation() {
1072        // Test CRC calculation function
1073        let test_data = vec![0x01, 0x03, 0x00, 0x00, 0x00, 0x02];
1074        let crc = ModbusRtuServer::calculate_crc(&test_data);
1075        
1076        // CRC should be consistent
1077        assert_eq!(crc, ModbusRtuServer::calculate_crc(&test_data));
1078        
1079        // Different data should give different CRC
1080        let test_data2 = vec![0x01, 0x04, 0x00, 0x00, 0x00, 0x01];
1081        let crc2 = ModbusRtuServer::calculate_crc(&test_data2);
1082        assert_ne!(crc, crc2);
1083    }
1084    
1085    #[test]
1086    fn test_rtu_error_response() {
1087        // Test RTU error response creation
1088        let result = ModbusRtuServer::create_rtu_error_response(0x01, 0x03, 0x01);
1089        assert!(result.is_ok());
1090        
1091        let response = result.unwrap();
1092        assert_eq!(response[0], 0x01); // Slave ID
1093        assert_eq!(response[1], 0x83); // Function code with error bit
1094        assert_eq!(response[2], 0x01); // Exception code
1095        assert_eq!(response.len(), 5); // Slave + Function + Exception + CRC (2 bytes)
1096    }
1097    
1098    #[tokio::test]
1099    async fn test_server_stats() {
1100        // Test server statistics
1101        let server = ModbusTcpServer::new("127.0.0.1:5021").unwrap();
1102        let stats = server.get_stats();
1103        
1104        // Initial stats should be zero
1105        assert_eq!(stats.connections_count, 0);
1106        assert_eq!(stats.total_requests, 0);
1107        assert_eq!(stats.successful_requests, 0);
1108        assert_eq!(stats.failed_requests, 0);
1109    }
1110    
1111    #[test]
1112    fn test_register_bank_integration() {
1113        // Test server with custom register bank
1114        let register_bank = Arc::new(ModbusRegisterBank::new());
1115        
1116        // Set some test values
1117        register_bank.write_single_coil(0, true).unwrap();
1118        register_bank.write_single_register(0, 0x1234).unwrap();
1119        
1120        let mut server = ModbusTcpServer::new("127.0.0.1:5022").unwrap();
1121        server.set_register_bank(register_bank.clone());
1122        
1123        // Verify register bank is set
1124        let server_bank = server.get_register_bank().unwrap();
1125        let coils = server_bank.read_coils(0, 1).unwrap();
1126        let registers = server_bank.read_holding_registers(0, 1).unwrap();
1127        
1128        assert_eq!(coils[0], true);
1129        assert_eq!(registers[0], 0x1234);
1130    }
1131}