1use std::time::{Duration, Instant};
18
19use bytes::BytesMut;
20use tokio_util::codec::{Decoder, Encoder};
21
22use crate::error::ModbusError;
23
24use super::frame::{
25 verify_crc, RtuFrame, RtuFrameError, RTU_MAX_FRAME_SIZE, RTU_MIN_FRAME_SIZE,
26};
27
28#[derive(Debug, Clone, Copy)]
32pub struct RtuTiming {
33 pub char_time: Duration,
35
36 pub inter_char_timeout: Duration,
38
39 pub inter_frame_timeout: Duration,
41}
42
43impl RtuTiming {
44 pub fn from_baud_rate(baud_rate: u32) -> Self {
61 Self::from_baud_rate_with_bits(baud_rate, 11)
62 }
63
64 pub fn from_baud_rate_with_bits(baud_rate: u32, bits_per_char: u32) -> Self {
71 let char_time_us = (bits_per_char as u64 * 1_000_000) / baud_rate as u64;
72 let char_time = Duration::from_micros(char_time_us);
73
74 let (inter_char, inter_frame) = if baud_rate > 19200 {
77 (Duration::from_micros(750), Duration::from_micros(1750))
78 } else {
79 (
80 char_time.mul_f32(1.5),
81 char_time.mul_f32(3.5),
82 )
83 };
84
85 Self {
86 char_time,
87 inter_char_timeout: inter_char,
88 inter_frame_timeout: inter_frame,
89 }
90 }
91
92 pub fn transmission_time(&self, bytes: usize) -> Duration {
94 self.char_time * bytes as u32
95 }
96}
97
98impl Default for RtuTiming {
99 fn default() -> Self {
100 Self::from_baud_rate(9600)
101 }
102}
103
104#[derive(Debug, Clone)]
106enum DecodeState {
107 Idle,
109
110 Receiving {
112 last_byte_time: Instant,
114 expected_length: Option<usize>,
116 },
117
118 Complete,
120}
121
122impl Default for DecodeState {
123 fn default() -> Self {
124 Self::Idle
125 }
126}
127
128#[derive(Debug)]
146pub struct RtuCodec {
147 timing: RtuTiming,
149
150 state: DecodeState,
152
153 buffer: BytesMut,
155
156 strict_timing: bool,
159
160 unit_id_filter: Option<Vec<u8>>,
162}
163
164impl RtuCodec {
165 pub fn new() -> Self {
167 Self::with_timing(RtuTiming::default())
168 }
169
170 pub fn with_timing(timing: RtuTiming) -> Self {
172 Self {
173 timing,
174 state: DecodeState::Idle,
175 buffer: BytesMut::with_capacity(RTU_MAX_FRAME_SIZE),
176 strict_timing: false,
177 unit_id_filter: None,
178 }
179 }
180
181 pub fn with_baud_rate(baud_rate: u32) -> Self {
183 Self::with_timing(RtuTiming::from_baud_rate(baud_rate))
184 }
185
186 pub fn strict_timing(mut self, enabled: bool) -> Self {
188 self.strict_timing = enabled;
189 self
190 }
191
192 pub fn unit_id_filter(mut self, unit_ids: Vec<u8>) -> Self {
196 self.unit_id_filter = Some(unit_ids);
197 self
198 }
199
200 pub fn timing(&self) -> &RtuTiming {
202 &self.timing
203 }
204
205 pub fn reset(&mut self) {
207 self.state = DecodeState::Idle;
208 self.buffer.clear();
209 }
210
211 fn try_parse_frame(&mut self) -> Result<Option<RtuFrame>, ModbusError> {
213 if self.buffer.len() < RTU_MIN_FRAME_SIZE {
214 return Ok(None);
215 }
216
217 let expected_len = self.estimate_frame_length();
219
220 match expected_len {
221 Some(len) if self.buffer.len() >= len => {
222 let frame_data = self.buffer.split_to(len);
224
225 match RtuFrame::decode(&frame_data) {
226 Ok(frame) => {
227 if let Some(ref filter) = self.unit_id_filter {
229 if !filter.contains(&frame.unit_id) && frame.unit_id != 0 {
230 self.state = DecodeState::Idle;
232 return Ok(None);
233 }
234 }
235
236 self.state = DecodeState::Idle;
237 Ok(Some(frame))
238 }
239 Err(RtuFrameError::CrcMismatch { expected, received }) => {
240 self.state = DecodeState::Idle;
242 Err(ModbusError::InvalidData(format!(
243 "CRC mismatch: expected 0x{:04X}, got 0x{:04X}",
244 expected, received
245 )))
246 }
247 Err(e) => {
248 self.state = DecodeState::Idle;
249 Err(ModbusError::InvalidData(e.to_string()))
250 }
251 }
252 }
253 Some(_) => {
254 Ok(None)
256 }
257 None if self.buffer.len() >= RTU_MAX_FRAME_SIZE => {
258 self.buffer.clear();
260 self.state = DecodeState::Idle;
261 Err(ModbusError::InvalidData(
262 "Unable to determine frame length, buffer overflow".into(),
263 ))
264 }
265 None => {
266 Ok(None)
268 }
269 }
270 }
271
272 fn estimate_frame_length(&self) -> Option<usize> {
274 if self.buffer.len() < 2 {
275 return None;
276 }
277
278 let function_code = self.buffer[1];
279
280 if function_code & 0x80 != 0 {
282 return Some(5);
284 }
285
286 match function_code {
287 0x01 | 0x02 | 0x03 | 0x04 | 0x05 | 0x06 => Some(8),
289
290 0x16 => Some(10),
293
294 0x0F | 0x10 => {
296 if self.buffer.len() >= 7 {
297 let byte_count = self.buffer[6] as usize;
298 Some(7 + byte_count + 2)
299 } else {
300 None
301 }
302 }
303
304 0x07 => Some(4),
306
307 0x08 => Some(8),
309
310 0x0B | 0x0C => Some(4),
312
313 0x11 => {
315 if self.buffer.len() >= 3 {
316 let byte_count = self.buffer[2] as usize;
317 Some(3 + byte_count + 2)
318 } else {
319 None
320 }
321 }
322
323 0x17 => {
325 if self.buffer.len() >= 11 {
326 let write_byte_count = self.buffer[10] as usize;
327 Some(11 + write_byte_count + 2)
328 } else {
329 None
330 }
331 }
332
333 _ => None,
335 }
336 }
337
338 fn check_frame_timeout(&mut self) -> bool {
340 if !self.strict_timing {
341 return false;
342 }
343
344 if let DecodeState::Receiving { last_byte_time, .. } = &self.state {
345 last_byte_time.elapsed() >= self.timing.inter_frame_timeout
346 } else {
347 false
348 }
349 }
350}
351
352impl Default for RtuCodec {
353 fn default() -> Self {
354 Self::new()
355 }
356}
357
358impl Decoder for RtuCodec {
359 type Item = RtuFrame;
360 type Error = ModbusError;
361
362 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
363 if self.check_frame_timeout() && !self.buffer.is_empty() {
365 if self.buffer.len() >= RTU_MIN_FRAME_SIZE && verify_crc(&self.buffer) {
367 return self.try_parse_frame();
368 } else {
369 self.buffer.clear();
371 self.state = DecodeState::Idle;
372 }
373 }
374
375 if src.is_empty() {
377 return Ok(None);
378 }
379
380 self.buffer.extend_from_slice(src);
382 src.clear();
383
384 self.state = DecodeState::Receiving {
386 last_byte_time: Instant::now(),
387 expected_length: self.estimate_frame_length(),
388 };
389
390 self.try_parse_frame()
392 }
393}
394
395impl Encoder<RtuFrame> for RtuCodec {
396 type Error = ModbusError;
397
398 fn encode(&mut self, item: RtuFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
399 if item.pdu.is_empty() {
401 return Err(ModbusError::InvalidData("PDU cannot be empty".into()));
402 }
403
404 if item.pdu.len() > super::frame::RTU_MAX_PDU_SIZE {
405 return Err(ModbusError::InvalidData(format!(
406 "PDU too large: {} bytes (max {})",
407 item.pdu.len(),
408 super::frame::RTU_MAX_PDU_SIZE
409 )));
410 }
411
412 dst.reserve(item.frame_size());
414 item.encode_to(dst);
415
416 Ok(())
417 }
418}
419
420#[derive(Debug)]
425pub struct StreamingRtuCodec {
426 inner: RtuCodec,
428
429 partial_frame: BytesMut,
431
432 last_byte_time: Option<Instant>,
434}
435
436impl StreamingRtuCodec {
437 pub fn new(timing: RtuTiming) -> Self {
439 Self {
440 inner: RtuCodec::with_timing(timing).strict_timing(true),
441 partial_frame: BytesMut::with_capacity(RTU_MAX_FRAME_SIZE),
442 last_byte_time: None,
443 }
444 }
445
446 pub fn process_byte(&mut self, byte: u8) -> Result<Option<RtuFrame>, ModbusError> {
450 let now = Instant::now();
451
452 if let Some(last_time) = self.last_byte_time {
454 if now.duration_since(last_time) >= self.inner.timing.inter_frame_timeout {
455 if !self.partial_frame.is_empty() {
457 if self.partial_frame.len() >= RTU_MIN_FRAME_SIZE
458 && verify_crc(&self.partial_frame)
459 {
460 let frame_data = std::mem::replace(
461 &mut self.partial_frame,
462 BytesMut::with_capacity(RTU_MAX_FRAME_SIZE),
463 );
464 self.last_byte_time = Some(now);
465 self.partial_frame.extend_from_slice(&[byte]);
466
467 return RtuFrame::decode(&frame_data)
468 .map(Some)
469 .map_err(|e| ModbusError::InvalidData(e.to_string()));
470 } else {
471 self.partial_frame.clear();
473 }
474 }
475 }
476 }
477
478 self.last_byte_time = Some(now);
479 self.partial_frame.extend_from_slice(&[byte]);
480
481 if self.partial_frame.len() >= RTU_MIN_FRAME_SIZE {
483 if let Some(expected_len) = self.inner.estimate_frame_length() {
484 if self.partial_frame.len() >= expected_len
485 && verify_crc(&self.partial_frame[..expected_len])
486 {
487 let frame_data = self.partial_frame.split_to(expected_len);
488 return RtuFrame::decode(&frame_data)
489 .map(Some)
490 .map_err(|e| ModbusError::InvalidData(e.to_string()));
491 }
492 }
493 }
494
495 if self.partial_frame.len() >= RTU_MAX_FRAME_SIZE {
497 self.partial_frame.clear();
498 return Err(ModbusError::InvalidData("Frame buffer overflow".into()));
499 }
500
501 Ok(None)
502 }
503
504 pub fn check_timeout(&mut self) -> Result<Option<RtuFrame>, ModbusError> {
508 if let Some(last_time) = self.last_byte_time {
509 if Instant::now().duration_since(last_time) >= self.inner.timing.inter_frame_timeout {
510 if self.partial_frame.len() >= RTU_MIN_FRAME_SIZE
511 && verify_crc(&self.partial_frame)
512 {
513 let frame_data = std::mem::replace(
514 &mut self.partial_frame,
515 BytesMut::with_capacity(RTU_MAX_FRAME_SIZE),
516 );
517 self.last_byte_time = None;
518
519 return RtuFrame::decode(&frame_data)
520 .map(Some)
521 .map_err(|e| ModbusError::InvalidData(e.to_string()));
522 } else if !self.partial_frame.is_empty() {
523 self.partial_frame.clear();
525 self.last_byte_time = None;
526 }
527 }
528 }
529
530 Ok(None)
531 }
532
533 pub fn reset(&mut self) {
535 self.inner.reset();
536 self.partial_frame.clear();
537 self.last_byte_time = None;
538 }
539}
540
541#[cfg(test)]
542mod tests {
543 use super::*;
544
545 #[test]
546 fn test_timing_calculation() {
547 let timing = RtuTiming::from_baud_rate(9600);
548
549 let char_time_us = timing.char_time.as_micros();
551 assert!(char_time_us > 1100 && char_time_us < 1200);
552
553 let inter_frame_us = timing.inter_frame_timeout.as_micros();
555 assert!(inter_frame_us > 3500 && inter_frame_us < 4500);
556 }
557
558 #[test]
559 fn test_high_baud_rate_minimums() {
560 let timing = RtuTiming::from_baud_rate(115200);
562
563 assert_eq!(timing.inter_char_timeout, Duration::from_micros(750));
564 assert_eq!(timing.inter_frame_timeout, Duration::from_micros(1750));
565 }
566
567 #[test]
568 fn test_codec_encode_decode() {
569 let mut codec = RtuCodec::new();
570
571 let frame = RtuFrame::new(1, vec![0x03, 0x00, 0x00, 0x00, 0x0A]);
573
574 let mut buf = BytesMut::new();
576 codec.encode(frame.clone(), &mut buf).unwrap();
577
578 assert_eq!(buf.len(), 8); let mut codec = RtuCodec::new();
583 let decoded = codec.decode(&mut buf).unwrap().unwrap();
584
585 assert_eq!(decoded.unit_id, frame.unit_id);
586 assert_eq!(decoded.pdu, frame.pdu);
587 }
588
589 #[test]
590 fn test_codec_partial_frame() {
591 let mut codec = RtuCodec::new();
592
593 let frame = RtuFrame::new(1, vec![0x03, 0x00, 0x00, 0x00, 0x0A]);
595 let full = frame.encode();
596
597 let mut buf = BytesMut::from(&full[..3]);
599 let result = codec.decode(&mut buf).unwrap();
600 assert!(result.is_none());
601
602 let mut remaining = BytesMut::from(&full[3..]);
604 let result = codec.decode(&mut remaining).unwrap();
605 assert!(result.is_some());
606 }
607
608 #[test]
609 fn test_codec_exception_frame() {
610 let mut codec = RtuCodec::new();
611
612 let frame = RtuFrame::exception(1, 0x03, 0x02);
614
615 let mut buf = BytesMut::new();
616 codec.encode(frame.clone(), &mut buf).unwrap();
617
618 assert_eq!(buf.len(), 5);
620
621 let decoded = codec.decode(&mut buf).unwrap().unwrap();
622 assert!(decoded.is_exception());
623 }
624
625 #[test]
626 fn test_codec_unit_id_filter() {
627 let mut codec = RtuCodec::new().unit_id_filter(vec![1, 2]);
628
629 let frame1 = RtuFrame::new(1, vec![0x03, 0x00, 0x00, 0x00, 0x0A]);
631 let mut buf = frame1.encode().into();
632
633 let result = codec.decode(&mut buf).unwrap();
634 assert!(result.is_some());
635
636 codec.reset();
638 let frame5 = RtuFrame::new(5, vec![0x03, 0x00, 0x00, 0x00, 0x0A]);
639 let mut buf = frame5.encode().into();
640
641 let result = codec.decode(&mut buf).unwrap();
642 assert!(result.is_none()); }
644
645 #[test]
646 fn test_transmission_time() {
647 let timing = RtuTiming::from_baud_rate(9600);
648
649 let time = timing.transmission_time(8);
651 let time_ms = time.as_millis();
652 assert!(time_ms >= 8 && time_ms <= 10);
653 }
654}