Skip to main content

mabi_modbus/rtu/
codec.rs

1//! Modbus RTU codec for async I/O.
2//!
3//! This module provides a tokio-compatible codec for encoding and decoding
4//! Modbus RTU frames with proper timing-based frame detection.
5//!
6//! # Frame Detection
7//!
8//! Unlike TCP which has length headers, RTU relies on timing gaps to detect
9//! frame boundaries:
10//!
11//! - **Inter-frame gap**: 3.5 character times of silence marks end of frame
12//! - **Inter-character gap**: Max 1.5 character times between bytes in a frame
13//!
14//! This codec implements both timing-based detection (when timing info is available)
15//! and function-code based detection as a fallback.
16
17use std::time::{Duration, Instant};
18
19use bytes::BytesMut;
20use tokio_util::codec::{Decoder, Encoder};
21
22use crate::error::ModbusError;
23
24use super::frame::{
25    verify_crc, RtuFrame, RtuFrameError, RTU_MAX_FRAME_SIZE, RTU_MIN_FRAME_SIZE,
26};
27
28/// RTU timing configuration.
29///
30/// Defines the timing parameters for frame detection based on baud rate.
31#[derive(Debug, Clone, Copy)]
32pub struct RtuTiming {
33    /// Time for one character (start + data + parity + stop bits).
34    pub char_time: Duration,
35
36    /// Inter-character timeout (1.5 character times).
37    pub inter_char_timeout: Duration,
38
39    /// Inter-frame timeout (3.5 character times).
40    pub inter_frame_timeout: Duration,
41}
42
43impl RtuTiming {
44    /// Create timing configuration for a specific baud rate.
45    ///
46    /// Assumes 11 bits per character (1 start + 8 data + 1 parity + 1 stop).
47    ///
48    /// # Arguments
49    ///
50    /// * `baud_rate` - Serial baud rate (e.g., 9600, 19200, 115200)
51    ///
52    /// # Example
53    ///
54    /// ```
55    /// use mabi_modbus::rtu::RtuTiming;
56    ///
57    /// let timing = RtuTiming::from_baud_rate(9600);
58    /// // At 9600 baud: char_time ≈ 1.145ms, inter_frame ≈ 4ms
59    /// ```
60    pub fn from_baud_rate(baud_rate: u32) -> Self {
61        Self::from_baud_rate_with_bits(baud_rate, 11)
62    }
63
64    /// Create timing configuration with custom bits per character.
65    ///
66    /// # Arguments
67    ///
68    /// * `baud_rate` - Serial baud rate
69    /// * `bits_per_char` - Total bits per character (typically 10-11)
70    pub fn from_baud_rate_with_bits(baud_rate: u32, bits_per_char: u32) -> Self {
71        let char_time_us = (bits_per_char as u64 * 1_000_000) / baud_rate as u64;
72        let char_time = Duration::from_micros(char_time_us);
73
74        // Modbus spec: 1.5 char times inter-character, 3.5 char times inter-frame
75        // For high baud rates (> 19200), use fixed minimums
76        let (inter_char, inter_frame) = if baud_rate > 19200 {
77            (Duration::from_micros(750), Duration::from_micros(1750))
78        } else {
79            (
80                char_time.mul_f32(1.5),
81                char_time.mul_f32(3.5),
82            )
83        };
84
85        Self {
86            char_time,
87            inter_char_timeout: inter_char,
88            inter_frame_timeout: inter_frame,
89        }
90    }
91
92    /// Get transmission time for a given number of bytes.
93    pub fn transmission_time(&self, bytes: usize) -> Duration {
94        self.char_time * bytes as u32
95    }
96}
97
98impl Default for RtuTiming {
99    fn default() -> Self {
100        Self::from_baud_rate(9600)
101    }
102}
103
104/// Frame detection state.
105#[derive(Debug, Clone)]
106enum DecodeState {
107    /// Waiting for first byte of a new frame.
108    Idle,
109
110    /// Receiving frame data.
111    Receiving {
112        /// When the last byte was received.
113        last_byte_time: Instant,
114        /// Expected frame length (if known from function code).
115        expected_length: Option<usize>,
116    },
117
118    /// Frame complete, ready to emit.
119    Complete,
120}
121
122impl Default for DecodeState {
123    fn default() -> Self {
124        Self::Idle
125    }
126}
127
128/// Modbus RTU codec.
129///
130/// This codec handles the encoding and decoding of RTU frames with
131/// proper timing-based frame detection.
132///
133/// # Example
134///
135/// ```rust,no_run
136/// use mabi_modbus::rtu::{RtuCodec, RtuTiming};
137/// use tokio_util::codec::Framed;
138///
139/// // Create codec with default timing (9600 baud)
140/// let codec = RtuCodec::new();
141///
142/// // Create codec with custom timing
143/// let codec = RtuCodec::with_timing(RtuTiming::from_baud_rate(115200));
144/// ```
145#[derive(Debug)]
146pub struct RtuCodec {
147    /// Timing configuration.
148    timing: RtuTiming,
149
150    /// Current decode state.
151    state: DecodeState,
152
153    /// Buffer for accumulating frame data.
154    buffer: BytesMut,
155
156    /// Enable strict timing-based frame detection.
157    /// When false, uses function-code based detection only.
158    strict_timing: bool,
159
160    /// Unit ID filter (None = accept all).
161    unit_id_filter: Option<Vec<u8>>,
162}
163
164impl RtuCodec {
165    /// Create a new RTU codec with default timing (9600 baud).
166    pub fn new() -> Self {
167        Self::with_timing(RtuTiming::default())
168    }
169
170    /// Create a codec with specific timing configuration.
171    pub fn with_timing(timing: RtuTiming) -> Self {
172        Self {
173            timing,
174            state: DecodeState::Idle,
175            buffer: BytesMut::with_capacity(RTU_MAX_FRAME_SIZE),
176            strict_timing: false,
177            unit_id_filter: None,
178        }
179    }
180
181    /// Create a codec for a specific baud rate.
182    pub fn with_baud_rate(baud_rate: u32) -> Self {
183        Self::with_timing(RtuTiming::from_baud_rate(baud_rate))
184    }
185
186    /// Enable strict timing-based frame detection.
187    pub fn strict_timing(mut self, enabled: bool) -> Self {
188        self.strict_timing = enabled;
189        self
190    }
191
192    /// Set unit ID filter.
193    ///
194    /// Only frames with matching unit IDs will be accepted.
195    pub fn unit_id_filter(mut self, unit_ids: Vec<u8>) -> Self {
196        self.unit_id_filter = Some(unit_ids);
197        self
198    }
199
200    /// Get the timing configuration.
201    pub fn timing(&self) -> &RtuTiming {
202        &self.timing
203    }
204
205    /// Reset the codec state.
206    pub fn reset(&mut self) {
207        self.state = DecodeState::Idle;
208        self.buffer.clear();
209    }
210
211    /// Try to parse a complete frame from the buffer.
212    fn try_parse_frame(&mut self) -> Result<Option<RtuFrame>, ModbusError> {
213        if self.buffer.len() < RTU_MIN_FRAME_SIZE {
214            return Ok(None);
215        }
216
217        // Try to determine expected length from function code
218        let expected_len = self.estimate_frame_length();
219
220        match expected_len {
221            Some(len) if self.buffer.len() >= len => {
222                // We have enough data, try to decode
223                let frame_data = self.buffer.split_to(len);
224
225                match RtuFrame::decode(&frame_data) {
226                    Ok(frame) => {
227                        // Check unit ID filter
228                        if let Some(ref filter) = self.unit_id_filter {
229                            if !filter.contains(&frame.unit_id) && frame.unit_id != 0 {
230                                // Ignore frame, reset and continue
231                                self.state = DecodeState::Idle;
232                                return Ok(None);
233                            }
234                        }
235
236                        self.state = DecodeState::Idle;
237                        Ok(Some(frame))
238                    }
239                    Err(RtuFrameError::CrcMismatch { expected, received }) => {
240                        // CRC error - frame is corrupt
241                        self.state = DecodeState::Idle;
242                        Err(ModbusError::InvalidData(format!(
243                            "CRC mismatch: expected 0x{:04X}, got 0x{:04X}",
244                            expected, received
245                        )))
246                    }
247                    Err(e) => {
248                        self.state = DecodeState::Idle;
249                        Err(ModbusError::InvalidData(e.to_string()))
250                    }
251                }
252            }
253            Some(_) => {
254                // Need more data
255                Ok(None)
256            }
257            None if self.buffer.len() >= RTU_MAX_FRAME_SIZE => {
258                // Buffer full but can't determine frame - discard
259                self.buffer.clear();
260                self.state = DecodeState::Idle;
261                Err(ModbusError::InvalidData(
262                    "Unable to determine frame length, buffer overflow".into(),
263                ))
264            }
265            None => {
266                // Unknown function code, wait for more data
267                Ok(None)
268            }
269        }
270    }
271
272    /// Estimate frame length based on function code.
273    fn estimate_frame_length(&self) -> Option<usize> {
274        if self.buffer.len() < 2 {
275            return None;
276        }
277
278        let function_code = self.buffer[1];
279
280        // Handle exception responses
281        if function_code & 0x80 != 0 {
282            // Exception: Unit + FC + ExCode + CRC = 5 bytes
283            return Some(5);
284        }
285
286        match function_code {
287            // Read requests and single writes (fixed 8 bytes)
288            0x01 | 0x02 | 0x03 | 0x04 | 0x05 | 0x06 => Some(8),
289
290            // Mask Write Register (fixed 10 bytes)
291            // Unit + FC + Addr(2) + And_Mask(2) + Or_Mask(2) + CRC(2)
292            0x16 => Some(10),
293
294            // Write multiple coils / registers
295            0x0F | 0x10 => {
296                if self.buffer.len() >= 7 {
297                    let byte_count = self.buffer[6] as usize;
298                    Some(7 + byte_count + 2)
299                } else {
300                    None
301                }
302            }
303
304            // Read exception status
305            0x07 => Some(4),
306
307            // Diagnostics
308            0x08 => Some(8),
309
310            // Get comm event counter / log
311            0x0B | 0x0C => Some(4),
312
313            // Report server ID (variable)
314            0x11 => {
315                if self.buffer.len() >= 3 {
316                    let byte_count = self.buffer[2] as usize;
317                    Some(3 + byte_count + 2)
318                } else {
319                    None
320                }
321            }
322
323            // Read/Write multiple registers
324            0x17 => {
325                if self.buffer.len() >= 11 {
326                    let write_byte_count = self.buffer[10] as usize;
327                    Some(11 + write_byte_count + 2)
328                } else {
329                    None
330                }
331            }
332
333            // Unknown - use timing or max size
334            _ => None,
335        }
336    }
337
338    /// Check if inter-frame timeout has elapsed.
339    fn check_frame_timeout(&mut self) -> bool {
340        if !self.strict_timing {
341            return false;
342        }
343
344        if let DecodeState::Receiving { last_byte_time, .. } = &self.state {
345            last_byte_time.elapsed() >= self.timing.inter_frame_timeout
346        } else {
347            false
348        }
349    }
350}
351
352impl Default for RtuCodec {
353    fn default() -> Self {
354        Self::new()
355    }
356}
357
358impl Decoder for RtuCodec {
359    type Item = RtuFrame;
360    type Error = ModbusError;
361
362    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
363        // Check for inter-frame timeout (frame complete by timing)
364        if self.check_frame_timeout() && !self.buffer.is_empty() {
365            // Try to parse what we have
366            if self.buffer.len() >= RTU_MIN_FRAME_SIZE && verify_crc(&self.buffer) {
367                return self.try_parse_frame();
368            } else {
369                // Invalid frame, discard
370                self.buffer.clear();
371                self.state = DecodeState::Idle;
372            }
373        }
374
375        // Process incoming data
376        if src.is_empty() {
377            return Ok(None);
378        }
379
380        // Move data to internal buffer
381        self.buffer.extend_from_slice(src);
382        src.clear();
383
384        // Update state
385        self.state = DecodeState::Receiving {
386            last_byte_time: Instant::now(),
387            expected_length: self.estimate_frame_length(),
388        };
389
390        // Try to parse a complete frame
391        self.try_parse_frame()
392    }
393}
394
395impl Encoder<RtuFrame> for RtuCodec {
396    type Error = ModbusError;
397
398    fn encode(&mut self, item: RtuFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
399        // Validate PDU size
400        if item.pdu.is_empty() {
401            return Err(ModbusError::InvalidData("PDU cannot be empty".into()));
402        }
403
404        if item.pdu.len() > super::frame::RTU_MAX_PDU_SIZE {
405            return Err(ModbusError::InvalidData(format!(
406                "PDU too large: {} bytes (max {})",
407                item.pdu.len(),
408                super::frame::RTU_MAX_PDU_SIZE
409            )));
410        }
411
412        // Reserve space and encode
413        dst.reserve(item.frame_size());
414        item.encode_to(dst);
415
416        Ok(())
417    }
418}
419
420/// Streaming RTU codec for continuous byte processing.
421///
422/// This codec is designed for scenarios where bytes arrive continuously
423/// (e.g., from a physical serial port) and timing information is critical.
424#[derive(Debug)]
425pub struct StreamingRtuCodec {
426    /// Base codec for frame handling.
427    inner: RtuCodec,
428
429    /// Current partial frame buffer.
430    partial_frame: BytesMut,
431
432    /// Time of last byte received.
433    last_byte_time: Option<Instant>,
434}
435
436impl StreamingRtuCodec {
437    /// Create a new streaming codec.
438    pub fn new(timing: RtuTiming) -> Self {
439        Self {
440            inner: RtuCodec::with_timing(timing).strict_timing(true),
441            partial_frame: BytesMut::with_capacity(RTU_MAX_FRAME_SIZE),
442            last_byte_time: None,
443        }
444    }
445
446    /// Process a single byte.
447    ///
448    /// Returns a frame if one is complete.
449    pub fn process_byte(&mut self, byte: u8) -> Result<Option<RtuFrame>, ModbusError> {
450        let now = Instant::now();
451
452        // Check for inter-frame gap
453        if let Some(last_time) = self.last_byte_time {
454            if now.duration_since(last_time) >= self.inner.timing.inter_frame_timeout {
455                // Gap detected - previous frame is complete
456                if !self.partial_frame.is_empty() {
457                    if self.partial_frame.len() >= RTU_MIN_FRAME_SIZE
458                        && verify_crc(&self.partial_frame)
459                    {
460                        let frame_data = std::mem::replace(
461                            &mut self.partial_frame,
462                            BytesMut::with_capacity(RTU_MAX_FRAME_SIZE),
463                        );
464                        self.last_byte_time = Some(now);
465                        self.partial_frame.extend_from_slice(&[byte]);
466
467                        return RtuFrame::decode(&frame_data)
468                            .map(Some)
469                            .map_err(|e| ModbusError::InvalidData(e.to_string()));
470                    } else {
471                        // Invalid frame, discard
472                        self.partial_frame.clear();
473                    }
474                }
475            }
476        }
477
478        self.last_byte_time = Some(now);
479        self.partial_frame.extend_from_slice(&[byte]);
480
481        // Check if we have a complete frame
482        if self.partial_frame.len() >= RTU_MIN_FRAME_SIZE {
483            if let Some(expected_len) = self.inner.estimate_frame_length() {
484                if self.partial_frame.len() >= expected_len
485                    && verify_crc(&self.partial_frame[..expected_len])
486                {
487                    let frame_data = self.partial_frame.split_to(expected_len);
488                    return RtuFrame::decode(&frame_data)
489                        .map(Some)
490                        .map_err(|e| ModbusError::InvalidData(e.to_string()));
491                }
492            }
493        }
494
495        // Check for buffer overflow
496        if self.partial_frame.len() >= RTU_MAX_FRAME_SIZE {
497            self.partial_frame.clear();
498            return Err(ModbusError::InvalidData("Frame buffer overflow".into()));
499        }
500
501        Ok(None)
502    }
503
504    /// Check if there's a pending frame due to timeout.
505    ///
506    /// Call this periodically when no bytes are being received.
507    pub fn check_timeout(&mut self) -> Result<Option<RtuFrame>, ModbusError> {
508        if let Some(last_time) = self.last_byte_time {
509            if Instant::now().duration_since(last_time) >= self.inner.timing.inter_frame_timeout {
510                if self.partial_frame.len() >= RTU_MIN_FRAME_SIZE
511                    && verify_crc(&self.partial_frame)
512                {
513                    let frame_data = std::mem::replace(
514                        &mut self.partial_frame,
515                        BytesMut::with_capacity(RTU_MAX_FRAME_SIZE),
516                    );
517                    self.last_byte_time = None;
518
519                    return RtuFrame::decode(&frame_data)
520                        .map(Some)
521                        .map_err(|e| ModbusError::InvalidData(e.to_string()));
522                } else if !self.partial_frame.is_empty() {
523                    // Invalid frame, discard
524                    self.partial_frame.clear();
525                    self.last_byte_time = None;
526                }
527            }
528        }
529
530        Ok(None)
531    }
532
533    /// Reset the codec state.
534    pub fn reset(&mut self) {
535        self.inner.reset();
536        self.partial_frame.clear();
537        self.last_byte_time = None;
538    }
539}
540
541#[cfg(test)]
542mod tests {
543    use super::*;
544
545    #[test]
546    fn test_timing_calculation() {
547        let timing = RtuTiming::from_baud_rate(9600);
548
549        // At 9600 baud with 11 bits per char: char_time ≈ 1.145ms
550        let char_time_us = timing.char_time.as_micros();
551        assert!(char_time_us > 1100 && char_time_us < 1200);
552
553        // Inter-frame should be ~4ms
554        let inter_frame_us = timing.inter_frame_timeout.as_micros();
555        assert!(inter_frame_us > 3500 && inter_frame_us < 4500);
556    }
557
558    #[test]
559    fn test_high_baud_rate_minimums() {
560        // High baud rates should use fixed minimums
561        let timing = RtuTiming::from_baud_rate(115200);
562
563        assert_eq!(timing.inter_char_timeout, Duration::from_micros(750));
564        assert_eq!(timing.inter_frame_timeout, Duration::from_micros(1750));
565    }
566
567    #[test]
568    fn test_codec_encode_decode() {
569        let mut codec = RtuCodec::new();
570
571        // Create a frame
572        let frame = RtuFrame::new(1, vec![0x03, 0x00, 0x00, 0x00, 0x0A]);
573
574        // Encode
575        let mut buf = BytesMut::new();
576        codec.encode(frame.clone(), &mut buf).unwrap();
577
578        // Verify encoded size
579        assert_eq!(buf.len(), 8); // 1 + 5 + 2
580
581        // Decode
582        let mut codec = RtuCodec::new();
583        let decoded = codec.decode(&mut buf).unwrap().unwrap();
584
585        assert_eq!(decoded.unit_id, frame.unit_id);
586        assert_eq!(decoded.pdu, frame.pdu);
587    }
588
589    #[test]
590    fn test_codec_partial_frame() {
591        let mut codec = RtuCodec::new();
592
593        // Create a complete frame first
594        let frame = RtuFrame::new(1, vec![0x03, 0x00, 0x00, 0x00, 0x0A]);
595        let full = frame.encode();
596
597        // Send first 3 bytes - should return None (need more data)
598        let mut buf = BytesMut::from(&full[..3]);
599        let result = codec.decode(&mut buf).unwrap();
600        assert!(result.is_none());
601
602        // Send remaining bytes - should now complete the frame
603        let mut remaining = BytesMut::from(&full[3..]);
604        let result = codec.decode(&mut remaining).unwrap();
605        assert!(result.is_some());
606    }
607
608    #[test]
609    fn test_codec_exception_frame() {
610        let mut codec = RtuCodec::new();
611
612        // Exception response
613        let frame = RtuFrame::exception(1, 0x03, 0x02);
614
615        let mut buf = BytesMut::new();
616        codec.encode(frame.clone(), &mut buf).unwrap();
617
618        // Should be 5 bytes
619        assert_eq!(buf.len(), 5);
620
621        let decoded = codec.decode(&mut buf).unwrap().unwrap();
622        assert!(decoded.is_exception());
623    }
624
625    #[test]
626    fn test_codec_unit_id_filter() {
627        let mut codec = RtuCodec::new().unit_id_filter(vec![1, 2]);
628
629        // Frame for unit 1 should pass
630        let frame1 = RtuFrame::new(1, vec![0x03, 0x00, 0x00, 0x00, 0x0A]);
631        let mut buf = frame1.encode().into();
632
633        let result = codec.decode(&mut buf).unwrap();
634        assert!(result.is_some());
635
636        // Frame for unit 5 should be filtered
637        codec.reset();
638        let frame5 = RtuFrame::new(5, vec![0x03, 0x00, 0x00, 0x00, 0x0A]);
639        let mut buf = frame5.encode().into();
640
641        let result = codec.decode(&mut buf).unwrap();
642        assert!(result.is_none()); // Filtered out
643    }
644
645    #[test]
646    fn test_transmission_time() {
647        let timing = RtuTiming::from_baud_rate(9600);
648
649        // 8 bytes at 9600 baud ≈ 9.16ms
650        let time = timing.transmission_time(8);
651        let time_ms = time.as_millis();
652        assert!(time_ms >= 8 && time_ms <= 10);
653    }
654}