1use crate::DataLinkError;
2use rustmod_core::encoding::{Reader, Writer};
3use rustmod_core::frame::{rtu as rtu_frame, tcp};
4use rustmod_core::pdu::{DecodedRequest, ExceptionCode, ExceptionResponse};
5use rustmod_core::DecodeError;
6use std::sync::Arc;
7use thiserror::Error;
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
10use tracing::{debug, warn};
11
12#[cfg(feature = "metrics")]
13use std::sync::atomic::{AtomicU64, Ordering};
14
15const DEFAULT_MAX_PDU_LEN: usize = 253;
16const DEFAULT_MAX_RTU_FRAME_LEN: usize = 256;
17
18#[derive(Debug, Error)]
19pub enum ServiceError {
20 #[error("modbus exception: {0:?}")]
21 Exception(ExceptionCode),
22 #[error("invalid request: {0}")]
23 InvalidRequest(&'static str),
24 #[error("internal error: {0}")]
25 Internal(&'static str),
26}
27
28pub trait ModbusService: Send + Sync + 'static {
29 fn handle(
34 &self,
35 unit_id: u8,
36 request: DecodedRequest<'_>,
37 response_pdu: &mut [u8],
38 ) -> Result<usize, ServiceError>;
39}
40
41impl<T> ModbusService for Arc<T>
42where
43 T: ModbusService + ?Sized,
44{
45 fn handle(
46 &self,
47 unit_id: u8,
48 request: DecodedRequest<'_>,
49 response_pdu: &mut [u8],
50 ) -> Result<usize, ServiceError> {
51 (**self).handle(unit_id, request, response_pdu)
52 }
53}
54
55#[cfg(feature = "metrics")]
56#[derive(Debug, Default)]
57pub struct ServerMetrics {
58 requests_total: AtomicU64,
59 responses_ok: AtomicU64,
60 exceptions_sent: AtomicU64,
61 decode_errors: AtomicU64,
62 internal_errors: AtomicU64,
63}
64
65#[cfg(feature = "metrics")]
66#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
67pub struct ServerMetricsSnapshot {
68 pub requests_total: u64,
69 pub responses_ok: u64,
70 pub exceptions_sent: u64,
71 pub decode_errors: u64,
72 pub internal_errors: u64,
73}
74
75#[cfg(feature = "metrics")]
76impl ServerMetrics {
77 fn snapshot(&self) -> ServerMetricsSnapshot {
78 ServerMetricsSnapshot {
79 requests_total: self.requests_total.load(Ordering::Relaxed),
80 responses_ok: self.responses_ok.load(Ordering::Relaxed),
81 exceptions_sent: self.exceptions_sent.load(Ordering::Relaxed),
82 decode_errors: self.decode_errors.load(Ordering::Relaxed),
83 internal_errors: self.internal_errors.load(Ordering::Relaxed),
84 }
85 }
86}
87
88pub struct ModbusTcpServer<S> {
89 listener: TcpListener,
90 service: Arc<S>,
91 max_pdu_len: usize,
92 #[cfg(feature = "metrics")]
93 metrics: Arc<ServerMetrics>,
94}
95
96impl<S: ModbusService> ModbusTcpServer<S> {
97 pub async fn bind<A: ToSocketAddrs>(addr: A, service: S) -> Result<Self, DataLinkError> {
98 let listener = TcpListener::bind(addr).await?;
99 Ok(Self::from_listener(listener, service))
100 }
101
102 pub fn from_listener(listener: TcpListener, service: S) -> Self {
103 Self {
104 listener,
105 service: Arc::new(service),
106 max_pdu_len: DEFAULT_MAX_PDU_LEN,
107 #[cfg(feature = "metrics")]
108 metrics: Arc::new(ServerMetrics::default()),
109 }
110 }
111
112 pub fn local_addr(&self) -> Result<std::net::SocketAddr, DataLinkError> {
113 Ok(self.listener.local_addr()?)
114 }
115
116 pub fn with_max_pdu_len(mut self, max_pdu_len: usize) -> Self {
117 self.max_pdu_len = max_pdu_len;
118 self
119 }
120
121 #[cfg(feature = "metrics")]
122 pub fn metrics_handle(&self) -> Arc<ServerMetrics> {
123 Arc::clone(&self.metrics)
124 }
125
126 #[cfg(feature = "metrics")]
127 pub fn metrics_snapshot(&self) -> ServerMetricsSnapshot {
128 self.metrics.snapshot()
129 }
130
131 pub async fn run(self) -> Result<(), DataLinkError> {
132 loop {
133 let (socket, peer) = self.listener.accept().await?;
134 let service = Arc::clone(&self.service);
135 let max_pdu_len = self.max_pdu_len;
136 #[cfg(feature = "metrics")]
137 let metrics = Arc::clone(&self.metrics);
138
139 tokio::spawn(async move {
140 if let Err(err) = handle_connection(
141 socket,
142 service,
143 max_pdu_len,
144 #[cfg(feature = "metrics")]
145 metrics,
146 )
147 .await
148 {
149 warn!(%peer, error = %err, "modbus tcp server connection ended with error");
150 }
151 });
152 }
153 }
154}
155
156pub struct ModbusRtuOverTcpServer<S> {
157 listener: TcpListener,
158 service: Arc<S>,
159 max_pdu_len: usize,
160 max_frame_len: usize,
161 #[cfg(feature = "metrics")]
162 metrics: Arc<ServerMetrics>,
163}
164
165impl<S: ModbusService> ModbusRtuOverTcpServer<S> {
166 pub async fn bind<A: ToSocketAddrs>(addr: A, service: S) -> Result<Self, DataLinkError> {
167 let listener = TcpListener::bind(addr).await?;
168 Ok(Self::from_listener(listener, service))
169 }
170
171 pub fn from_listener(listener: TcpListener, service: S) -> Self {
172 Self {
173 listener,
174 service: Arc::new(service),
175 max_pdu_len: DEFAULT_MAX_PDU_LEN,
176 max_frame_len: DEFAULT_MAX_RTU_FRAME_LEN,
177 #[cfg(feature = "metrics")]
178 metrics: Arc::new(ServerMetrics::default()),
179 }
180 }
181
182 pub fn local_addr(&self) -> Result<std::net::SocketAddr, DataLinkError> {
183 Ok(self.listener.local_addr()?)
184 }
185
186 pub fn with_max_pdu_len(mut self, max_pdu_len: usize) -> Self {
187 self.max_pdu_len = max_pdu_len;
188 self
189 }
190
191 pub fn with_max_frame_len(mut self, max_frame_len: usize) -> Self {
192 self.max_frame_len = max_frame_len;
193 self
194 }
195
196 #[cfg(feature = "metrics")]
197 pub fn metrics_handle(&self) -> Arc<ServerMetrics> {
198 Arc::clone(&self.metrics)
199 }
200
201 #[cfg(feature = "metrics")]
202 pub fn metrics_snapshot(&self) -> ServerMetricsSnapshot {
203 self.metrics.snapshot()
204 }
205
206 pub async fn run(self) -> Result<(), DataLinkError> {
207 loop {
208 let (socket, peer) = self.listener.accept().await?;
209 let service = Arc::clone(&self.service);
210 let max_pdu_len = self.max_pdu_len;
211 let max_frame_len = self.max_frame_len;
212 #[cfg(feature = "metrics")]
213 let metrics = Arc::clone(&self.metrics);
214
215 tokio::spawn(async move {
216 if let Err(err) = handle_rtu_over_tcp_connection(
217 socket,
218 service,
219 max_pdu_len,
220 max_frame_len,
221 #[cfg(feature = "metrics")]
222 metrics,
223 )
224 .await
225 {
226 warn!(
227 %peer,
228 error = %err,
229 "modbus rtu-over-tcp server connection ended with error"
230 );
231 }
232 });
233 }
234 }
235}
236
237async fn handle_connection<S: ModbusService>(
238 mut socket: TcpStream,
239 service: Arc<S>,
240 max_pdu_len: usize,
241 #[cfg(feature = "metrics")] metrics: Arc<ServerMetrics>,
242) -> Result<(), DataLinkError> {
243 loop {
244 let mut mbap = [0u8; tcp::MBAP_HEADER_LEN];
245 if let Err(err) = socket.read_exact(&mut mbap).await {
246 if err.kind() == std::io::ErrorKind::UnexpectedEof {
247 return Ok(());
248 }
249 return Err(DataLinkError::Io(err));
250 }
251
252 let mut mbap_reader = Reader::new(&mbap);
253 let header = tcp::MbapHeader::decode(&mut mbap_reader)?;
254 let pdu_len = usize::from(header.length)
255 .checked_sub(1)
256 .ok_or(DataLinkError::InvalidResponse("invalid mbap length"))?;
257
258 if pdu_len == 0 || pdu_len > max_pdu_len {
259 return Err(DataLinkError::InvalidResponse("invalid request pdu length"));
260 }
261
262 let mut request_pdu = vec![0u8; pdu_len];
263 socket.read_exact(&mut request_pdu).await?;
264
265 #[cfg(feature = "metrics")]
266 metrics.requests_total.fetch_add(1, Ordering::Relaxed);
267
268 let mut request_reader = Reader::new(&request_pdu);
269 let decoded = match DecodedRequest::decode(&mut request_reader) {
270 Ok(req) if request_reader.is_empty() => req,
271 Ok(_) => {
272 #[cfg(feature = "metrics")]
273 {
274 metrics.decode_errors.fetch_add(1, Ordering::Relaxed);
275 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
276 }
277 let function = request_pdu[0] & 0x7F;
278 send_exception(
279 &mut socket,
280 header.transaction_id,
281 header.unit_id,
282 function,
283 ExceptionCode::IllegalDataValue,
284 )
285 .await?;
286 continue;
287 }
288 Err(err) => {
289 #[cfg(feature = "metrics")]
290 {
291 metrics.decode_errors.fetch_add(1, Ordering::Relaxed);
292 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
293 }
294 let function = request_pdu.first().copied().unwrap_or(0) & 0x7F;
295 send_exception(
296 &mut socket,
297 header.transaction_id,
298 header.unit_id,
299 function,
300 map_decode_error_to_exception(err),
301 )
302 .await?;
303 continue;
304 }
305 };
306
307 debug!(
308 correlation_id = header.transaction_id,
309 unit_id = header.unit_id,
310 function = decoded.function_code().as_u8(),
311 pdu_len,
312 "received modbus tcp request"
313 );
314
315 let mut response_pdu = vec![0u8; max_pdu_len];
316 match service.handle(header.unit_id, decoded, &mut response_pdu) {
317 Ok(response_len) => {
318 if response_len == 0 || response_len > max_pdu_len {
319 #[cfg(feature = "metrics")]
320 {
321 metrics.internal_errors.fetch_add(1, Ordering::Relaxed);
322 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
323 }
324 send_exception(
325 &mut socket,
326 header.transaction_id,
327 header.unit_id,
328 decoded.function_code().as_u8(),
329 ExceptionCode::ServerDeviceFailure,
330 )
331 .await?;
332 continue;
333 }
334
335 #[cfg(feature = "metrics")]
336 metrics.responses_ok.fetch_add(1, Ordering::Relaxed);
337
338 send_pdu(
339 &mut socket,
340 header.transaction_id,
341 header.unit_id,
342 &response_pdu[..response_len],
343 )
344 .await?;
345 }
346 Err(ServiceError::Exception(code)) => {
347 #[cfg(feature = "metrics")]
348 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
349
350 send_exception(
351 &mut socket,
352 header.transaction_id,
353 header.unit_id,
354 decoded.function_code().as_u8(),
355 code,
356 )
357 .await?;
358 }
359 Err(ServiceError::InvalidRequest(_)) => {
360 #[cfg(feature = "metrics")]
361 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
362
363 send_exception(
364 &mut socket,
365 header.transaction_id,
366 header.unit_id,
367 decoded.function_code().as_u8(),
368 ExceptionCode::IllegalDataValue,
369 )
370 .await?;
371 }
372 Err(ServiceError::Internal(_)) => {
373 #[cfg(feature = "metrics")]
374 {
375 metrics.internal_errors.fetch_add(1, Ordering::Relaxed);
376 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
377 }
378
379 send_exception(
380 &mut socket,
381 header.transaction_id,
382 header.unit_id,
383 decoded.function_code().as_u8(),
384 ExceptionCode::ServerDeviceFailure,
385 )
386 .await?;
387 }
388 }
389 }
390}
391
392fn decode_rtu_suffix_frame(buffer: &[u8]) -> Option<(usize, u8, &[u8])> {
393 if buffer.len() < 4 {
394 return None;
395 }
396 for start in 0..=buffer.len() - 4 {
397 if let Ok((unit_id, pdu)) = rtu_frame::decode_frame(&buffer[start..]) {
398 return Some((start, unit_id, pdu));
399 }
400 }
401 None
402}
403
404async fn handle_rtu_over_tcp_connection<S: ModbusService>(
405 mut socket: TcpStream,
406 service: Arc<S>,
407 max_pdu_len: usize,
408 max_frame_len: usize,
409 #[cfg(feature = "metrics")] metrics: Arc<ServerMetrics>,
410) -> Result<(), DataLinkError> {
411 if max_frame_len < 4 {
412 return Err(DataLinkError::InvalidResponse(
413 "rtu frame length must be at least 4 bytes",
414 ));
415 }
416
417 let mut frame = vec![0u8; max_frame_len];
418 let mut len = 0usize;
419
420 loop {
421 if len == max_frame_len {
422 frame.copy_within(1..max_frame_len, 0);
424 len -= 1;
425 }
426
427 let n = socket.read(&mut frame[len..len + 1]).await?;
428 if n == 0 {
429 return Ok(());
430 }
431 len += n;
432
433 let Some((_, unit_id, request_pdu)) = decode_rtu_suffix_frame(&frame[..len]) else {
434 continue;
435 };
436 len = 0;
437
438 #[cfg(feature = "metrics")]
439 metrics.requests_total.fetch_add(1, Ordering::Relaxed);
440
441 if request_pdu.is_empty() || request_pdu.len() > max_pdu_len {
442 #[cfg(feature = "metrics")]
443 {
444 metrics.decode_errors.fetch_add(1, Ordering::Relaxed);
445 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
446 }
447 send_rtu_exception(&mut socket, unit_id, 0, ExceptionCode::IllegalDataValue).await?;
448 continue;
449 }
450
451 let mut request_reader = Reader::new(request_pdu);
452 let decoded = match DecodedRequest::decode(&mut request_reader) {
453 Ok(req) if request_reader.is_empty() => req,
454 Ok(_) => {
455 #[cfg(feature = "metrics")]
456 {
457 metrics.decode_errors.fetch_add(1, Ordering::Relaxed);
458 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
459 }
460 let function = request_pdu[0] & 0x7F;
461 send_rtu_exception(
462 &mut socket,
463 unit_id,
464 function,
465 ExceptionCode::IllegalDataValue,
466 )
467 .await?;
468 continue;
469 }
470 Err(err) => {
471 #[cfg(feature = "metrics")]
472 {
473 metrics.decode_errors.fetch_add(1, Ordering::Relaxed);
474 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
475 }
476 let function = request_pdu.first().copied().unwrap_or(0) & 0x7F;
477 send_rtu_exception(
478 &mut socket,
479 unit_id,
480 function,
481 map_decode_error_to_exception(err),
482 )
483 .await?;
484 continue;
485 }
486 };
487
488 debug!(
489 unit_id,
490 function = decoded.function_code().as_u8(),
491 pdu_len = request_pdu.len(),
492 "received modbus rtu-over-tcp request"
493 );
494
495 let mut response_pdu = vec![0u8; max_pdu_len];
496 match service.handle(unit_id, decoded, &mut response_pdu) {
497 Ok(response_len) => {
498 if response_len == 0 || response_len > max_pdu_len {
499 #[cfg(feature = "metrics")]
500 {
501 metrics.internal_errors.fetch_add(1, Ordering::Relaxed);
502 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
503 }
504 send_rtu_exception(
505 &mut socket,
506 unit_id,
507 decoded.function_code().as_u8(),
508 ExceptionCode::ServerDeviceFailure,
509 )
510 .await?;
511 continue;
512 }
513
514 #[cfg(feature = "metrics")]
515 metrics.responses_ok.fetch_add(1, Ordering::Relaxed);
516
517 send_rtu_pdu(&mut socket, unit_id, &response_pdu[..response_len]).await?;
518 }
519 Err(ServiceError::Exception(code)) => {
520 #[cfg(feature = "metrics")]
521 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
522
523 send_rtu_exception(&mut socket, unit_id, decoded.function_code().as_u8(), code)
524 .await?;
525 }
526 Err(ServiceError::InvalidRequest(_)) => {
527 #[cfg(feature = "metrics")]
528 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
529
530 send_rtu_exception(
531 &mut socket,
532 unit_id,
533 decoded.function_code().as_u8(),
534 ExceptionCode::IllegalDataValue,
535 )
536 .await?;
537 }
538 Err(ServiceError::Internal(_)) => {
539 #[cfg(feature = "metrics")]
540 {
541 metrics.internal_errors.fetch_add(1, Ordering::Relaxed);
542 metrics.exceptions_sent.fetch_add(1, Ordering::Relaxed);
543 }
544
545 send_rtu_exception(
546 &mut socket,
547 unit_id,
548 decoded.function_code().as_u8(),
549 ExceptionCode::ServerDeviceFailure,
550 )
551 .await?;
552 }
553 }
554 }
555}
556
557fn map_decode_error_to_exception(err: DecodeError) -> ExceptionCode {
558 match err {
559 DecodeError::InvalidFunctionCode => ExceptionCode::IllegalFunction,
560 DecodeError::InvalidLength | DecodeError::InvalidValue | DecodeError::UnexpectedEof => {
561 ExceptionCode::IllegalDataValue
562 }
563 DecodeError::InvalidCrc | DecodeError::Unsupported | DecodeError::Message(_) => {
564 ExceptionCode::ServerDeviceFailure
565 }
566 }
567}
568
569async fn send_exception(
570 socket: &mut TcpStream,
571 transaction_id: u16,
572 unit_id: u8,
573 function_code: u8,
574 exception_code: ExceptionCode,
575) -> Result<(), DataLinkError> {
576 let mut pdu = [0u8; 2];
577 let mut pdu_writer = Writer::new(&mut pdu);
578 ExceptionResponse {
579 function_code,
580 exception_code,
581 }
582 .encode(&mut pdu_writer)
583 .map_err(DataLinkError::Encode)?;
584
585 send_pdu(socket, transaction_id, unit_id, pdu_writer.as_written()).await
586}
587
588async fn send_pdu(
589 socket: &mut TcpStream,
590 transaction_id: u16,
591 unit_id: u8,
592 pdu: &[u8],
593) -> Result<(), DataLinkError> {
594 let mut frame = vec![0u8; tcp::MBAP_HEADER_LEN + pdu.len()];
595 let mut frame_writer = Writer::new(&mut frame);
596 tcp::encode_frame(&mut frame_writer, transaction_id, unit_id, pdu)?;
597
598 debug!(
599 correlation_id = transaction_id,
600 unit_id,
601 pdu_len = pdu.len(),
602 "sending modbus tcp server response"
603 );
604 socket.write_all(frame_writer.as_written()).await?;
605 Ok(())
606}
607
608async fn send_rtu_exception(
609 socket: &mut TcpStream,
610 unit_id: u8,
611 function_code: u8,
612 exception_code: ExceptionCode,
613) -> Result<(), DataLinkError> {
614 let mut pdu = [0u8; 2];
615 let mut pdu_writer = Writer::new(&mut pdu);
616 ExceptionResponse {
617 function_code,
618 exception_code,
619 }
620 .encode(&mut pdu_writer)
621 .map_err(DataLinkError::Encode)?;
622
623 send_rtu_pdu(socket, unit_id, pdu_writer.as_written()).await
624}
625
626async fn send_rtu_pdu(socket: &mut TcpStream, unit_id: u8, pdu: &[u8]) -> Result<(), DataLinkError> {
627 let mut frame = vec![0u8; pdu.len() + 3];
628 let mut writer = Writer::new(&mut frame);
629 rtu_frame::encode_frame(&mut writer, unit_id, pdu)?;
630 socket.write_all(writer.as_written()).await?;
631 Ok(())
632}
633
634#[cfg(test)]
635mod tests {
636 use super::{ModbusRtuOverTcpServer, ModbusService, ModbusTcpServer, ServiceError};
637 use crate::{DataLink, ModbusTcpTransport};
638 use rustmod_core::encoding::Writer;
639 use rustmod_core::frame::rtu as rtu_frame;
640 use rustmod_core::pdu::{DecodedRequest, ExceptionCode};
641 use tokio::io::{AsyncReadExt, AsyncWriteExt};
642 use tokio::net::TcpStream;
643
644 struct EchoReadService;
645
646 impl ModbusService for EchoReadService {
647 fn handle(
648 &self,
649 _unit_id: u8,
650 request: DecodedRequest<'_>,
651 response_pdu: &mut [u8],
652 ) -> Result<usize, ServiceError> {
653 match request {
654 DecodedRequest::ReadHoldingRegisters(_) => {
655 let bytes = [0x03u8, 0x02, 0x00, 0x2A];
656 response_pdu[..bytes.len()].copy_from_slice(&bytes);
657 Ok(bytes.len())
658 }
659 _ => Err(ServiceError::Exception(ExceptionCode::IllegalFunction)),
660 }
661 }
662 }
663
664 struct AlwaysExceptionService;
665
666 impl ModbusService for AlwaysExceptionService {
667 fn handle(
668 &self,
669 _unit_id: u8,
670 _request: DecodedRequest<'_>,
671 _response_pdu: &mut [u8],
672 ) -> Result<usize, ServiceError> {
673 Err(ServiceError::Exception(ExceptionCode::IllegalDataAddress))
674 }
675 }
676
677 #[tokio::test]
678 async fn tcp_server_handles_basic_read_request() {
679 let server = ModbusTcpServer::bind("127.0.0.1:0", EchoReadService)
680 .await
681 .unwrap();
682 let addr = server.local_addr().unwrap();
683 let task = tokio::spawn(server.run());
684
685 let transport = ModbusTcpTransport::connect(addr).await.unwrap();
686 let mut response = [0u8; 32];
687 let len = transport
688 .exchange(1, &[0x03, 0x00, 0x00, 0x00, 0x01], &mut response)
689 .await
690 .unwrap();
691 assert_eq!(&response[..len], &[0x03, 0x02, 0x00, 0x2A]);
692
693 task.abort();
694 let _ = task.await;
695 }
696
697 #[tokio::test]
698 async fn tcp_server_sends_exception_response() {
699 let server = ModbusTcpServer::bind("127.0.0.1:0", AlwaysExceptionService)
700 .await
701 .unwrap();
702 let addr = server.local_addr().unwrap();
703 let task = tokio::spawn(server.run());
704
705 let transport = ModbusTcpTransport::connect(addr).await.unwrap();
706 let mut response = [0u8; 32];
707 let len = transport
708 .exchange(1, &[0x03, 0x00, 0x00, 0x00, 0x01], &mut response)
709 .await
710 .unwrap();
711 assert_eq!(&response[..len], &[0x83, 0x02]);
712
713 task.abort();
714 let _ = task.await;
715 }
716
717 #[tokio::test]
718 async fn tcp_server_maps_decode_error_to_exception() {
719 let server = ModbusTcpServer::bind("127.0.0.1:0", EchoReadService)
720 .await
721 .unwrap();
722 let addr = server.local_addr().unwrap();
723 let task = tokio::spawn(server.run());
724
725 let transport = ModbusTcpTransport::connect(addr).await.unwrap();
726 let mut response = [0u8; 32];
727 let len = transport
728 .exchange(
729 1,
730 &[0x10, 0x00, 0x00, 0x00, 0x02, 0x03, 0x12, 0x34, 0x56],
731 &mut response,
732 )
733 .await
734 .unwrap();
735 assert_eq!(&response[..len], &[0x90, 0x03]);
736
737 task.abort();
738 let _ = task.await;
739 }
740
741 #[tokio::test]
742 async fn rtu_over_tcp_server_handles_basic_read_request() {
743 let server = ModbusRtuOverTcpServer::bind("127.0.0.1:0", EchoReadService)
744 .await
745 .unwrap();
746 let addr = server.local_addr().unwrap();
747 let task = tokio::spawn(server.run());
748
749 let mut stream = TcpStream::connect(addr).await.unwrap();
750 let mut request = [0u8; 16];
751 let mut writer = Writer::new(&mut request);
752 rtu_frame::encode_frame(&mut writer, 1, &[0x03, 0x00, 0x00, 0x00, 0x01]).unwrap();
753 stream.write_all(writer.as_written()).await.unwrap();
754
755 let mut response = [0u8; 7];
756 stream.read_exact(&mut response).await.unwrap();
757 let (unit_id, pdu) = rtu_frame::decode_frame(&response).unwrap();
758 assert_eq!(unit_id, 1);
759 assert_eq!(pdu, &[0x03, 0x02, 0x00, 0x2A]);
760
761 task.abort();
762 let _ = task.await;
763 }
764
765 #[tokio::test]
766 async fn rtu_over_tcp_server_maps_decode_error_to_exception() {
767 let server = ModbusRtuOverTcpServer::bind("127.0.0.1:0", EchoReadService)
768 .await
769 .unwrap();
770 let addr = server.local_addr().unwrap();
771 let task = tokio::spawn(server.run());
772
773 let mut stream = TcpStream::connect(addr).await.unwrap();
774 let mut request = [0u8; 32];
775 let mut writer = Writer::new(&mut request);
776 rtu_frame::encode_frame(
777 &mut writer,
778 1,
779 &[0x10, 0x00, 0x00, 0x00, 0x02, 0x03, 0x12, 0x34, 0x56],
780 )
781 .unwrap();
782 stream.write_all(writer.as_written()).await.unwrap();
783
784 let mut response = [0u8; 5];
785 stream.read_exact(&mut response).await.unwrap();
786 let (unit_id, pdu) = rtu_frame::decode_frame(&response).unwrap();
787 assert_eq!(unit_id, 1);
788 assert_eq!(pdu, &[0x90, 0x03]);
789
790 task.abort();
791 let _ = task.await;
792 }
793}