Skip to main content

rustmod_datalink/
server.rs

1use crate::DataLinkError;
2use rustmod_core::encoding::{Reader, Writer};
3use rustmod_core::frame::{rtu as rtu_frame, tcp};
4use rustmod_core::pdu::{DecodedRequest, ExceptionCode, ExceptionResponse};
5use rustmod_core::DecodeError;
6use std::sync::Arc;
7use thiserror::Error;
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
10use tracing::{debug, warn};
11
12#[cfg(feature = "metrics")]
13use std::sync::atomic::{AtomicU64, Ordering};
14
15const DEFAULT_MAX_PDU_LEN: usize = 253;
16const DEFAULT_MAX_RTU_FRAME_LEN: usize = 256;
17
18#[derive(Debug, Error)]
19pub enum ServiceError {
20    #[error("modbus exception: {0:?}")]
21    Exception(ExceptionCode),
22    #[error("invalid request: {0}")]
23    InvalidRequest(&'static str),
24    #[error("internal error: {0}")]
25    Internal(&'static str),
26}
27
28pub trait ModbusService: Send + Sync + 'static {
29    /// Handle a decoded request and write a response PDU into `response_pdu`.
30    ///
31    /// Return the number of bytes written. The response must include function
32    /// code and payload, but not MBAP header bytes.
33    fn handle(
34        &self,
35        unit_id: u8,
36        request: DecodedRequest<'_>,
37        response_pdu: &mut [u8],
38    ) -> Result<usize, ServiceError>;
39}
40
41impl<T> ModbusService for Arc<T>
42where
43    T: ModbusService + ?Sized,
44{
45    fn handle(
46        &self,
47        unit_id: u8,
48        request: DecodedRequest<'_>,
49        response_pdu: &mut [u8],
50    ) -> Result<usize, ServiceError> {
51        (**self).handle(unit_id, request, response_pdu)
52    }
53}
54
55#[cfg(feature = "metrics")]
56#[derive(Debug, Default)]
57pub struct ServerMetrics {
58    requests_total: AtomicU64,
59    responses_ok: AtomicU64,
60    exceptions_sent: AtomicU64,
61    decode_errors: AtomicU64,
62    internal_errors: AtomicU64,
63}
64
65#[cfg(feature = "metrics")]
66#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
67pub struct ServerMetricsSnapshot {
68    pub requests_total: u64,
69    pub responses_ok: u64,
70    pub exceptions_sent: u64,
71    pub decode_errors: u64,
72    pub internal_errors: u64,
73}
74
75#[cfg(feature = "metrics")]
76impl ServerMetrics {
77    fn snapshot(&self) -> ServerMetricsSnapshot {
78        ServerMetricsSnapshot {
79            requests_total: self.requests_total.load(Ordering::Relaxed),
80            responses_ok: self.responses_ok.load(Ordering::Relaxed),
81            exceptions_sent: self.exceptions_sent.load(Ordering::Relaxed),
82            decode_errors: self.decode_errors.load(Ordering::Relaxed),
83            internal_errors: self.internal_errors.load(Ordering::Relaxed),
84        }
85    }
86}
87
88pub struct ModbusTcpServer<S> {
89    listener: TcpListener,
90    service: Arc<S>,
91    max_pdu_len: usize,
92    #[cfg(feature = "metrics")]
93    metrics: Arc<ServerMetrics>,
94}
95
96impl<S: ModbusService> ModbusTcpServer<S> {
97    pub async fn bind<A: ToSocketAddrs>(addr: A, service: S) -> Result<Self, DataLinkError> {
98        let listener = TcpListener::bind(addr).await?;
99        Ok(Self::from_listener(listener, service))
100    }
101
102    pub fn from_listener(listener: TcpListener, service: S) -> Self {
103        Self {
104            listener,
105            service: Arc::new(service),
106            max_pdu_len: DEFAULT_MAX_PDU_LEN,
107            #[cfg(feature = "metrics")]
108            metrics: Arc::new(ServerMetrics::default()),
109        }
110    }
111
112    pub fn local_addr(&self) -> Result<std::net::SocketAddr, DataLinkError> {
113        Ok(self.listener.local_addr()?)
114    }
115
116    pub fn with_max_pdu_len(mut self, max_pdu_len: usize) -> Self {
117        self.max_pdu_len = max_pdu_len;
118        self
119    }
120
121    #[cfg(feature = "metrics")]
122    pub fn metrics_handle(&self) -> Arc<ServerMetrics> {
123        Arc::clone(&self.metrics)
124    }
125
126    #[cfg(feature = "metrics")]
127    pub fn metrics_snapshot(&self) -> ServerMetricsSnapshot {
128        self.metrics.snapshot()
129    }
130
131    pub async fn run(self) -> Result<(), DataLinkError> {
132        loop {
133            let (socket, peer) = self.listener.accept().await?;
134            let service = Arc::clone(&self.service);
135            let max_pdu_len = self.max_pdu_len;
136            #[cfg(feature = "metrics")]
137            let metrics = Arc::clone(&self.metrics);
138
139            tokio::spawn(async move {
140                if let Err(err) = handle_connection(
141                    socket,
142                    service,
143                    max_pdu_len,
144                    #[cfg(feature = "metrics")]
145                    metrics,
146                )
147                .await
148                {
149                    warn!(%peer, error = %err, "modbus tcp server connection ended with error");
150                }
151            });
152        }
153    }
154}
155
156pub struct ModbusRtuOverTcpServer<S> {
157    listener: TcpListener,
158    service: Arc<S>,
159    max_pdu_len: usize,
160    max_frame_len: usize,
161    #[cfg(feature = "metrics")]
162    metrics: Arc<ServerMetrics>,
163}
164
165impl<S: ModbusService> ModbusRtuOverTcpServer<S> {
166    pub async fn bind<A: ToSocketAddrs>(addr: A, service: S) -> Result<Self, DataLinkError> {
167        let listener = TcpListener::bind(addr).await?;
168        Ok(Self::from_listener(listener, service))
169    }
170
171    pub fn from_listener(listener: TcpListener, service: S) -> Self {
172        Self {
173            listener,
174            service: Arc::new(service),
175            max_pdu_len: DEFAULT_MAX_PDU_LEN,
176            max_frame_len: DEFAULT_MAX_RTU_FRAME_LEN,
177            #[cfg(feature = "metrics")]
178            metrics: Arc::new(ServerMetrics::default()),
179        }
180    }
181
182    pub fn local_addr(&self) -> Result<std::net::SocketAddr, DataLinkError> {
183        Ok(self.listener.local_addr()?)
184    }
185
186    pub fn with_max_pdu_len(mut self, max_pdu_len: usize) -> Self {
187        self.max_pdu_len = max_pdu_len;
188        self
189    }
190
191    pub fn with_max_frame_len(mut self, max_frame_len: usize) -> Self {
192        self.max_frame_len = max_frame_len;
193        self
194    }
195
196    #[cfg(feature = "metrics")]
197    pub fn metrics_handle(&self) -> Arc<ServerMetrics> {
198        Arc::clone(&self.metrics)
199    }
200
201    #[cfg(feature = "metrics")]
202    pub fn metrics_snapshot(&self) -> ServerMetricsSnapshot {
203        self.metrics.snapshot()
204    }
205
206    pub async fn run(self) -> Result<(), DataLinkError> {
207        loop {
208            let (socket, peer) = self.listener.accept().await?;
209            let service = Arc::clone(&self.service);
210            let max_pdu_len = self.max_pdu_len;
211            let max_frame_len = self.max_frame_len;
212            #[cfg(feature = "metrics")]
213            let metrics = Arc::clone(&self.metrics);
214
215            tokio::spawn(async move {
216                if let Err(err) = handle_rtu_over_tcp_connection(
217                    socket,
218                    service,
219                    max_pdu_len,
220                    max_frame_len,
221                    #[cfg(feature = "metrics")]
222                    metrics,
223                )
224                .await
225                {
226                    warn!(
227                        %peer,
228                        error = %err,
229                        "modbus rtu-over-tcp server connection ended with error"
230                    );
231                }
232            });
233        }
234    }
235}
236
237async fn handle_connection<S: ModbusService>(
238    mut socket: TcpStream,
239    service: Arc<S>,
240    max_pdu_len: usize,
241    #[cfg(feature = "metrics")] metrics: Arc<ServerMetrics>,
242) -> Result<(), DataLinkError> {
243    loop {
244        let mut mbap = [0u8; tcp::MBAP_HEADER_LEN];
245        if let Err(err) = socket.read_exact(&mut mbap).await {
246            if err.kind() == std::io::ErrorKind::UnexpectedEof {
247                return Ok(());
248            }
249            return Err(DataLinkError::Io(err));
250        }
251
252        let mut mbap_reader = Reader::new(&mbap);
253        let header = tcp::MbapHeader::decode(&mut mbap_reader)?;
254        let pdu_len = usize::from(header.length)
255            .checked_sub(1)
256            .ok_or(DataLinkError::InvalidResponse("invalid mbap length"))?;
257
258        if pdu_len == 0 || pdu_len > max_pdu_len {
259            return Err(DataLinkError::InvalidResponse("invalid request pdu length"));
260        }
261
262        let mut request_pdu = vec![0u8; pdu_len];
263        socket.read_exact(&mut request_pdu).await?;
264
265        #[cfg(feature = "metrics")]
266        metrics.requests_total.fetch_add(1, Ordering::Relaxed);
267
268        let mut request_reader = Reader::new(&request_pdu);
269        let decoded = match DecodedRequest::decode(&mut request_reader) {
270            Ok(req) if request_reader.is_empty() => req,
271            Ok(_) => {
272                #[cfg(feature = "metrics")]
273                {
274                    metrics.decode_errors.fetch_add(1, Ordering::Relaxed);
275                    metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
276                }
277                let function = request_pdu[0] & 0x7F;
278                send_exception(
279                    &mut socket,
280                    header.transaction_id,
281                    header.unit_id,
282                    function,
283                    ExceptionCode::IllegalDataValue,
284                )
285                .await?;
286                continue;
287            }
288            Err(err) => {
289                #[cfg(feature = "metrics")]
290                {
291                    metrics.decode_errors.fetch_add(1, Ordering::Relaxed);
292                    metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
293                }
294                let function = request_pdu.first().copied().unwrap_or(0) & 0x7F;
295                send_exception(
296                    &mut socket,
297                    header.transaction_id,
298                    header.unit_id,
299                    function,
300                    map_decode_error_to_exception(err),
301                )
302                .await?;
303                continue;
304            }
305        };
306
307        debug!(
308            correlation_id = header.transaction_id,
309            unit_id = header.unit_id,
310            function = decoded.function_code().as_u8(),
311            pdu_len,
312            "received modbus tcp request"
313        );
314
315        let mut response_pdu = vec![0u8; max_pdu_len];
316        match service.handle(header.unit_id, decoded, &mut response_pdu) {
317            Ok(response_len) => {
318                if response_len == 0 || response_len > max_pdu_len {
319                    #[cfg(feature = "metrics")]
320                    {
321                        metrics.internal_errors.fetch_add(1, Ordering::Relaxed);
322                        metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
323                    }
324                    send_exception(
325                        &mut socket,
326                        header.transaction_id,
327                        header.unit_id,
328                        decoded.function_code().as_u8(),
329                        ExceptionCode::ServerDeviceFailure,
330                    )
331                    .await?;
332                    continue;
333                }
334
335                #[cfg(feature = "metrics")]
336                metrics.responses_ok.fetch_add(1, Ordering::Relaxed);
337
338                send_pdu(
339                    &mut socket,
340                    header.transaction_id,
341                    header.unit_id,
342                    &response_pdu[..response_len],
343                )
344                .await?;
345            }
346            Err(ServiceError::Exception(code)) => {
347                #[cfg(feature = "metrics")]
348                metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
349
350                send_exception(
351                    &mut socket,
352                    header.transaction_id,
353                    header.unit_id,
354                    decoded.function_code().as_u8(),
355                    code,
356                )
357                .await?;
358            }
359            Err(ServiceError::InvalidRequest(_)) => {
360                #[cfg(feature = "metrics")]
361                metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
362
363                send_exception(
364                    &mut socket,
365                    header.transaction_id,
366                    header.unit_id,
367                    decoded.function_code().as_u8(),
368                    ExceptionCode::IllegalDataValue,
369                )
370                .await?;
371            }
372            Err(ServiceError::Internal(_)) => {
373                #[cfg(feature = "metrics")]
374                {
375                    metrics.internal_errors.fetch_add(1, Ordering::Relaxed);
376                    metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
377                }
378
379                send_exception(
380                    &mut socket,
381                    header.transaction_id,
382                    header.unit_id,
383                    decoded.function_code().as_u8(),
384                    ExceptionCode::ServerDeviceFailure,
385                )
386                .await?;
387            }
388        }
389    }
390}
391
392fn decode_rtu_suffix_frame(buffer: &[u8]) -> Option<(usize, u8, &[u8])> {
393    if buffer.len() < 4 {
394        return None;
395    }
396    for start in 0..=buffer.len() - 4 {
397        if let Ok((unit_id, pdu)) = rtu_frame::decode_frame(&buffer[start..]) {
398            return Some((start, unit_id, pdu));
399        }
400    }
401    None
402}
403
404async fn handle_rtu_over_tcp_connection<S: ModbusService>(
405    mut socket: TcpStream,
406    service: Arc<S>,
407    max_pdu_len: usize,
408    max_frame_len: usize,
409    #[cfg(feature = "metrics")] metrics: Arc<ServerMetrics>,
410) -> Result<(), DataLinkError> {
411    if max_frame_len < 4 {
412        return Err(DataLinkError::InvalidResponse(
413            "rtu frame length must be at least 4 bytes",
414        ));
415    }
416
417    let mut frame = vec![0u8; max_frame_len];
418    let mut len = 0usize;
419
420    loop {
421        if len == max_frame_len {
422            // Drop oldest byte so we can continue scanning for a valid frame boundary.
423            frame.copy_within(1..max_frame_len, 0);
424            len -= 1;
425        }
426
427        let n = socket.read(&mut frame[len..len + 1]).await?;
428        if n == 0 {
429            return Ok(());
430        }
431        len += n;
432
433        let Some((_, unit_id, request_pdu)) = decode_rtu_suffix_frame(&frame[..len]) else {
434            continue;
435        };
436        len = 0;
437
438        #[cfg(feature = "metrics")]
439        metrics.requests_total.fetch_add(1, Ordering::Relaxed);
440
441        if request_pdu.is_empty() || request_pdu.len() > max_pdu_len {
442            #[cfg(feature = "metrics")]
443            {
444                metrics.decode_errors.fetch_add(1, Ordering::Relaxed);
445                metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
446            }
447            send_rtu_exception(&mut socket, unit_id, 0, ExceptionCode::IllegalDataValue).await?;
448            continue;
449        }
450
451        let mut request_reader = Reader::new(request_pdu);
452        let decoded = match DecodedRequest::decode(&mut request_reader) {
453            Ok(req) if request_reader.is_empty() => req,
454            Ok(_) => {
455                #[cfg(feature = "metrics")]
456                {
457                    metrics.decode_errors.fetch_add(1, Ordering::Relaxed);
458                    metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
459                }
460                let function = request_pdu[0] & 0x7F;
461                send_rtu_exception(
462                    &mut socket,
463                    unit_id,
464                    function,
465                    ExceptionCode::IllegalDataValue,
466                )
467                .await?;
468                continue;
469            }
470            Err(err) => {
471                #[cfg(feature = "metrics")]
472                {
473                    metrics.decode_errors.fetch_add(1, Ordering::Relaxed);
474                    metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
475                }
476                let function = request_pdu.first().copied().unwrap_or(0) & 0x7F;
477                send_rtu_exception(
478                    &mut socket,
479                    unit_id,
480                    function,
481                    map_decode_error_to_exception(err),
482                )
483                .await?;
484                continue;
485            }
486        };
487
488        debug!(
489            unit_id,
490            function = decoded.function_code().as_u8(),
491            pdu_len = request_pdu.len(),
492            "received modbus rtu-over-tcp request"
493        );
494
495        let mut response_pdu = vec![0u8; max_pdu_len];
496        match service.handle(unit_id, decoded, &mut response_pdu) {
497            Ok(response_len) => {
498                if response_len == 0 || response_len > max_pdu_len {
499                    #[cfg(feature = "metrics")]
500                    {
501                        metrics.internal_errors.fetch_add(1, Ordering::Relaxed);
502                        metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
503                    }
504                    send_rtu_exception(
505                        &mut socket,
506                        unit_id,
507                        decoded.function_code().as_u8(),
508                        ExceptionCode::ServerDeviceFailure,
509                    )
510                    .await?;
511                    continue;
512                }
513
514                #[cfg(feature = "metrics")]
515                metrics.responses_ok.fetch_add(1, Ordering::Relaxed);
516
517                send_rtu_pdu(&mut socket, unit_id, &response_pdu[..response_len]).await?;
518            }
519            Err(ServiceError::Exception(code)) => {
520                #[cfg(feature = "metrics")]
521                metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
522
523                send_rtu_exception(&mut socket, unit_id, decoded.function_code().as_u8(), code)
524                    .await?;
525            }
526            Err(ServiceError::InvalidRequest(_)) => {
527                #[cfg(feature = "metrics")]
528                metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
529
530                send_rtu_exception(
531                    &mut socket,
532                    unit_id,
533                    decoded.function_code().as_u8(),
534                    ExceptionCode::IllegalDataValue,
535                )
536                .await?;
537            }
538            Err(ServiceError::Internal(_)) => {
539                #[cfg(feature = "metrics")]
540                {
541                    metrics.internal_errors.fetch_add(1, Ordering::Relaxed);
542                    metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
543                }
544
545                send_rtu_exception(
546                    &mut socket,
547                    unit_id,
548                    decoded.function_code().as_u8(),
549                    ExceptionCode::ServerDeviceFailure,
550                )
551                .await?;
552            }
553        }
554    }
555}
556
557fn map_decode_error_to_exception(err: DecodeError) -> ExceptionCode {
558    match err {
559        DecodeError::InvalidFunctionCode => ExceptionCode::IllegalFunction,
560        DecodeError::InvalidLength | DecodeError::InvalidValue | DecodeError::UnexpectedEof => {
561            ExceptionCode::IllegalDataValue
562        }
563        DecodeError::InvalidCrc | DecodeError::Unsupported | DecodeError::Message(_) => {
564            ExceptionCode::ServerDeviceFailure
565        }
566    }
567}
568
569async fn send_exception(
570    socket: &mut TcpStream,
571    transaction_id: u16,
572    unit_id: u8,
573    function_code: u8,
574    exception_code: ExceptionCode,
575) -> Result<(), DataLinkError> {
576    let mut pdu = [0u8; 2];
577    let mut pdu_writer = Writer::new(&mut pdu);
578    ExceptionResponse {
579        function_code,
580        exception_code,
581    }
582    .encode(&mut pdu_writer)
583    .map_err(DataLinkError::Encode)?;
584
585    send_pdu(socket, transaction_id, unit_id, pdu_writer.as_written()).await
586}
587
588async fn send_pdu(
589    socket: &mut TcpStream,
590    transaction_id: u16,
591    unit_id: u8,
592    pdu: &[u8],
593) -> Result<(), DataLinkError> {
594    let mut frame = vec![0u8; tcp::MBAP_HEADER_LEN + pdu.len()];
595    let mut frame_writer = Writer::new(&mut frame);
596    tcp::encode_frame(&mut frame_writer, transaction_id, unit_id, pdu)?;
597
598    debug!(
599        correlation_id = transaction_id,
600        unit_id,
601        pdu_len = pdu.len(),
602        "sending modbus tcp server response"
603    );
604    socket.write_all(frame_writer.as_written()).await?;
605    Ok(())
606}
607
608async fn send_rtu_exception(
609    socket: &mut TcpStream,
610    unit_id: u8,
611    function_code: u8,
612    exception_code: ExceptionCode,
613) -> Result<(), DataLinkError> {
614    let mut pdu = [0u8; 2];
615    let mut pdu_writer = Writer::new(&mut pdu);
616    ExceptionResponse {
617        function_code,
618        exception_code,
619    }
620    .encode(&mut pdu_writer)
621    .map_err(DataLinkError::Encode)?;
622
623    send_rtu_pdu(socket, unit_id, pdu_writer.as_written()).await
624}
625
626async fn send_rtu_pdu(socket: &mut TcpStream, unit_id: u8, pdu: &[u8]) -> Result<(), DataLinkError> {
627    let mut frame = vec![0u8; pdu.len() + 3];
628    let mut writer = Writer::new(&mut frame);
629    rtu_frame::encode_frame(&mut writer, unit_id, pdu)?;
630    socket.write_all(writer.as_written()).await?;
631    Ok(())
632}
633
634#[cfg(test)]
635mod tests {
636    use super::{ModbusRtuOverTcpServer, ModbusService, ModbusTcpServer, ServiceError};
637    use crate::{DataLink, ModbusTcpTransport};
638    use rustmod_core::encoding::Writer;
639    use rustmod_core::frame::rtu as rtu_frame;
640    use rustmod_core::pdu::{DecodedRequest, ExceptionCode};
641    use tokio::io::{AsyncReadExt, AsyncWriteExt};
642    use tokio::net::TcpStream;
643
644    struct EchoReadService;
645
646    impl ModbusService for EchoReadService {
647        fn handle(
648            &self,
649            _unit_id: u8,
650            request: DecodedRequest<'_>,
651            response_pdu: &mut [u8],
652        ) -> Result<usize, ServiceError> {
653            match request {
654                DecodedRequest::ReadHoldingRegisters(_) => {
655                    let bytes = [0x03u8, 0x02, 0x00, 0x2A];
656                    response_pdu[..bytes.len()].copy_from_slice(&bytes);
657                    Ok(bytes.len())
658                }
659                _ => Err(ServiceError::Exception(ExceptionCode::IllegalFunction)),
660            }
661        }
662    }
663
664    struct AlwaysExceptionService;
665
666    impl ModbusService for AlwaysExceptionService {
667        fn handle(
668            &self,
669            _unit_id: u8,
670            _request: DecodedRequest<'_>,
671            _response_pdu: &mut [u8],
672        ) -> Result<usize, ServiceError> {
673            Err(ServiceError::Exception(ExceptionCode::IllegalDataAddress))
674        }
675    }
676
677    #[tokio::test]
678    async fn tcp_server_handles_basic_read_request() {
679        let server = ModbusTcpServer::bind("127.0.0.1:0", EchoReadService)
680            .await
681            .unwrap();
682        let addr = server.local_addr().unwrap();
683        let task = tokio::spawn(server.run());
684
685        let transport = ModbusTcpTransport::connect(addr).await.unwrap();
686        let mut response = [0u8; 32];
687        let len = transport
688            .exchange(1, &[0x03, 0x00, 0x00, 0x00, 0x01], &mut response)
689            .await
690            .unwrap();
691        assert_eq!(&response[..len], &[0x03, 0x02, 0x00, 0x2A]);
692
693        task.abort();
694        let _ = task.await;
695    }
696
697    #[tokio::test]
698    async fn tcp_server_sends_exception_response() {
699        let server = ModbusTcpServer::bind("127.0.0.1:0", AlwaysExceptionService)
700            .await
701            .unwrap();
702        let addr = server.local_addr().unwrap();
703        let task = tokio::spawn(server.run());
704
705        let transport = ModbusTcpTransport::connect(addr).await.unwrap();
706        let mut response = [0u8; 32];
707        let len = transport
708            .exchange(1, &[0x03, 0x00, 0x00, 0x00, 0x01], &mut response)
709            .await
710            .unwrap();
711        assert_eq!(&response[..len], &[0x83, 0x02]);
712
713        task.abort();
714        let _ = task.await;
715    }
716
717    #[tokio::test]
718    async fn tcp_server_maps_decode_error_to_exception() {
719        let server = ModbusTcpServer::bind("127.0.0.1:0", EchoReadService)
720            .await
721            .unwrap();
722        let addr = server.local_addr().unwrap();
723        let task = tokio::spawn(server.run());
724
725        let transport = ModbusTcpTransport::connect(addr).await.unwrap();
726        let mut response = [0u8; 32];
727        let len = transport
728            .exchange(
729                1,
730                &[0x10, 0x00, 0x00, 0x00, 0x02, 0x03, 0x12, 0x34, 0x56],
731                &mut response,
732            )
733            .await
734            .unwrap();
735        assert_eq!(&response[..len], &[0x90, 0x03]);
736
737        task.abort();
738        let _ = task.await;
739    }
740
741    #[tokio::test]
742    async fn rtu_over_tcp_server_handles_basic_read_request() {
743        let server = ModbusRtuOverTcpServer::bind("127.0.0.1:0", EchoReadService)
744            .await
745            .unwrap();
746        let addr = server.local_addr().unwrap();
747        let task = tokio::spawn(server.run());
748
749        let mut stream = TcpStream::connect(addr).await.unwrap();
750        let mut request = [0u8; 16];
751        let mut writer = Writer::new(&mut request);
752        rtu_frame::encode_frame(&mut writer, 1, &[0x03, 0x00, 0x00, 0x00, 0x01]).unwrap();
753        stream.write_all(writer.as_written()).await.unwrap();
754
755        let mut response = [0u8; 7];
756        stream.read_exact(&mut response).await.unwrap();
757        let (unit_id, pdu) = rtu_frame::decode_frame(&response).unwrap();
758        assert_eq!(unit_id, 1);
759        assert_eq!(pdu, &[0x03, 0x02, 0x00, 0x2A]);
760
761        task.abort();
762        let _ = task.await;
763    }
764
765    #[tokio::test]
766    async fn rtu_over_tcp_server_maps_decode_error_to_exception() {
767        let server = ModbusRtuOverTcpServer::bind("127.0.0.1:0", EchoReadService)
768            .await
769            .unwrap();
770        let addr = server.local_addr().unwrap();
771        let task = tokio::spawn(server.run());
772
773        let mut stream = TcpStream::connect(addr).await.unwrap();
774        let mut request = [0u8; 32];
775        let mut writer = Writer::new(&mut request);
776        rtu_frame::encode_frame(
777            &mut writer,
778            1,
779            &[0x10, 0x00, 0x00, 0x00, 0x02, 0x03, 0x12, 0x34, 0x56],
780        )
781        .unwrap();
782        stream.write_all(writer.as_written()).await.unwrap();
783
784        let mut response = [0u8; 5];
785        stream.read_exact(&mut response).await.unwrap();
786        let (unit_id, pdu) = rtu_frame::decode_frame(&response).unwrap();
787        assert_eq!(unit_id, 1);
788        assert_eq!(pdu, &[0x90, 0x03]);
789
790        task.abort();
791        let _ = task.await;
792    }
793}