Skip to main content

rs_modbus/layers/application/
mod.rs

1use crate::error::ModbusError;
2use crate::layers::physical::{ConnectionId, ResponseFn};
3use crate::types::{ApplicationDataUnit, CustomFunctionCode, FramedDataUnit};
4use tokio::sync::broadcast;
5
6/// Application-layer role. Set by `ModbusMaster` / `ModbusSlave` when they
7/// take ownership of an application layer.
8///
9/// RTU framing differentiates request vs response by role (request and
10/// response of the same FC may have different lengths).
11#[derive(Clone, Copy, Debug, PartialEq, Eq)]
12pub enum ApplicationRole {
13    Master,
14    Slave,
15}
16
17/// Shared implementation of [`ApplicationLayer::set_role`].
18///
19/// Returns `Ok(())` on first set or re-setting the same role.
20/// Returns `InvalidState` if a *different* role is already assigned.
21pub(crate) fn set_role_impl(
22    current: &mut Option<ApplicationRole>,
23    role: ApplicationRole,
24) -> Result<(), ModbusError> {
25    match *current {
26        Some(existing) if existing == role => Ok(()),
27        Some(existing) => Err(ModbusError::InvalidState(format!(
28            "application layer role already set to {existing:?}, cannot change to {role:?}"
29        ))),
30        None => {
31            *current = Some(role);
32            Ok(())
33        }
34    }
35}
36
37/// Wire protocol implemented by an [`ApplicationLayer`]. Exposed through
38/// [`ApplicationLayer::protocol`] so callers (master, slave, tests) can
39/// gate protocol-specific behavior — notably, `ModbusMaster` uses it to
40/// reject `concurrent: true` configurations on non-TCP layers.
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub enum ApplicationProtocol {
43    Tcp,
44    Rtu,
45    Ascii,
46}
47
48/// A successfully framed PDU emitted by an [`ApplicationLayer`] via
49/// `subscribe_framing`. Carries the parsed ADU, the raw bytes that produced it,
50/// the per-message reply closure, and the connection it came from.
51#[derive(Clone)]
52pub struct Framing {
53    pub adu: ApplicationDataUnit,
54    pub raw: Vec<u8>,
55    pub response: ResponseFn,
56    pub connection: ConnectionId,
57}
58
59#[async_trait::async_trait]
60pub trait ApplicationLayer: Send + Sync {
61    /// Bind the application layer to a master/slave role. Must succeed on the
62    /// first call and fail (`ModbusError::InvalidState`) if a different role
63    /// is then assigned. Re-assigning the same role is a no-op (idempotent).
64    fn set_role(&self, role: ApplicationRole) -> Result<(), ModbusError>;
65
66    /// Current role, or `None` if not yet assigned.
67    fn role(&self) -> Option<ApplicationRole>;
68
69    /// Wire protocol implemented by this layer. Used by `ModbusMaster` to
70    /// validate `concurrent` configuration at construction time.
71    fn protocol(&self) -> ApplicationProtocol;
72
73    /// Encode an ADU into wire bytes per the protocol's framing format
74    /// (MBAP for TCP, CRC for RTU, hex+LRC for ASCII).
75    fn encode(&self, adu: &ApplicationDataUnit) -> Vec<u8>;
76
77    /// Decode a single complete frame back into an ADU. Returned only for
78    /// backward compatibility with the previous stateless API; new consumers
79    /// should subscribe to `subscribe_framing` instead.
80    fn decode(&self, data: &[u8]) -> Result<FramedDataUnit, ModbusError>;
81
82    /// Drop any per-connection state (decoding buffers, timers). Called by
83    /// Master before each request, by Slave between sessions.
84    fn flush(&self);
85
86    /// Subscribe to successfully framed PDUs assembled from the underlying
87    /// physical layer.
88    fn subscribe_framing(&self) -> broadcast::Receiver<Framing>;
89
90    /// Subscribe to framing-level errors (CRC/LRC failure, invalid MBAP
91    /// header, etc.). One error per offending physical-layer chunk.
92    fn subscribe_framing_error(&self) -> broadcast::Receiver<ModbusError>;
93
94    /// Register a custom function code predictor. Default is a no-op; only
95    /// [`RtuApplicationLayer`] overrides this with real behavior.
96    fn add_custom_function_code(&self, _cfc: CustomFunctionCode) {}
97
98    /// Remove a previously registered custom function code. Default is a no-op.
99    fn remove_custom_function_code(&self, _fc: u8) {}
100
101    /// Release task handles and drop physical-layer subscriptions.
102    async fn destroy(&self);
103}
104
105mod ascii;
106mod rtu;
107mod tcp;
108
109pub use ascii::{AsciiApplicationLayer, AsciiApplicationLayerOptions};
110pub use rtu::{FrameInterval, RtuApplicationLayer, RtuApplicationLayerOptions};
111pub use tcp::TcpApplicationLayer;
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use crate::layers::physical::{PhysicalLayer, TcpClientPhysicalLayer, TcpServerPhysicalLayer};
117    use std::sync::Arc;
118
119    // ===== Base types =====
120
121    #[test]
122    fn test_application_role_equality() {
123        assert_eq!(ApplicationRole::Master, ApplicationRole::Master);
124        assert_ne!(ApplicationRole::Master, ApplicationRole::Slave);
125    }
126
127    #[test]
128    fn test_framing_clone_preserves_fields() {
129        use crate::layers::physical::{ConnectionId, ResponseFn};
130
131        let response: ResponseFn = Arc::new(|_| Box::pin(async { Ok(()) }));
132        let conn: ConnectionId = Arc::from("test");
133        let framing = Framing {
134            adu: ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x0a]),
135            raw: vec![0xff; 4],
136            response,
137            connection: conn,
138        };
139        let cloned = framing.clone();
140        assert_eq!(cloned.adu.unit, 1);
141        assert_eq!(cloned.adu.fc, 0x03);
142        assert_eq!(cloned.raw, vec![0xff; 4]);
143        assert_eq!(&*cloned.connection, "test");
144    }
145
146    // ===== role / set_role =====
147
148    fn make_tcp_app() -> Arc<TcpApplicationLayer> {
149        // Bind to an idle server physical (never opened) so we have a real
150        // PhysicalLayer to subscribe; the spawned task simply awaits an event
151        // that will never come, which is fine for tests focused on
152        // encode/decode/role behavior.
153        let physical = TcpServerPhysicalLayer::new();
154        TcpApplicationLayer::new(physical)
155    }
156
157    fn make_rtu_app() -> Arc<RtuApplicationLayer> {
158        // RTU bound to a TCP-like (Net) physical: no inter-frame timer is
159        // needed and decode is stateless in this commit.
160        let physical = TcpClientPhysicalLayer::new();
161        RtuApplicationLayer::new(physical, RtuApplicationLayerOptions::default())
162    }
163
164    fn make_ascii_app() -> Arc<AsciiApplicationLayer> {
165        let physical = TcpServerPhysicalLayer::new();
166        AsciiApplicationLayer::new(physical)
167    }
168
169    #[tokio::test]
170    async fn test_set_role_first_call_succeeds() {
171        let app = make_tcp_app();
172        assert_eq!(app.role(), None);
173        app.set_role(ApplicationRole::Master).unwrap();
174        assert_eq!(app.role(), Some(ApplicationRole::Master));
175        app.destroy().await;
176    }
177
178    #[tokio::test]
179    async fn test_set_role_same_again_is_idempotent() {
180        let app = make_tcp_app();
181        app.set_role(ApplicationRole::Slave).unwrap();
182        app.set_role(ApplicationRole::Slave).unwrap();
183        assert_eq!(app.role(), Some(ApplicationRole::Slave));
184        app.destroy().await;
185    }
186
187    #[tokio::test]
188    async fn test_set_role_conflict_returns_invalid_state() {
189        let app = make_tcp_app();
190        app.set_role(ApplicationRole::Master).unwrap();
191        let err = app.set_role(ApplicationRole::Slave).unwrap_err();
192        assert!(matches!(err, ModbusError::InvalidState(_)));
193        app.destroy().await;
194    }
195
196    // ===== encode / decode =====
197
198    #[tokio::test]
199    async fn test_tcp_encode() {
200        let layer = make_tcp_app();
201        let adu = ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x00, 0x00, 0x0a]);
202        let frame = layer.encode(&adu);
203        assert_eq!(frame.len(), 12);
204        assert_eq!(&frame[2..4], [0x00, 0x00]); // protocol = 0
205        assert_eq!(u16::from_be_bytes([frame[4], frame[5]]), 6); // length = 2 + 4
206        assert_eq!(frame[6], 1); // unit
207        assert_eq!(frame[7], 0x03); // fc
208        assert_eq!(&frame[8..], [0x00, 0x00, 0x00, 0x0a]);
209        layer.destroy().await;
210    }
211
212    #[tokio::test]
213    async fn test_tcp_encode_with_transaction() {
214        let layer = make_tcp_app();
215        let adu = ApplicationDataUnit::new(1, 0x03, vec![]).with_transaction(42);
216        let frame = layer.encode(&adu);
217        assert_eq!(u16::from_be_bytes([frame[0], frame[1]]), 42);
218        layer.destroy().await;
219    }
220
221    #[tokio::test]
222    async fn test_tcp_decode() {
223        let layer = make_tcp_app();
224        let frame = vec![0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x01, 0x03, 0x00, 0x0a];
225        let decoded = layer.decode(&frame).unwrap();
226        assert_eq!(decoded.adu.transaction, Some(1));
227        assert_eq!(decoded.adu.unit, 1);
228        assert_eq!(decoded.adu.fc, 0x03);
229        assert_eq!(decoded.adu.data, vec![0x00, 0x0a]);
230        layer.destroy().await;
231    }
232
233    #[tokio::test]
234    async fn test_tcp_decode_invalid_protocol() {
235        let layer = make_tcp_app();
236        let frame = vec![0x00, 0x01, 0x00, 0x01, 0x00, 0x04, 0x01, 0x03, 0x00, 0x0a];
237        assert!(matches!(
238            layer.decode(&frame),
239            Err(ModbusError::InvalidData)
240        ));
241        layer.destroy().await;
242    }
243
244    #[tokio::test]
245    async fn test_tcp_roundtrip() {
246        let layer = make_tcp_app();
247        let adu =
248            ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x00, 0x00, 0x0a]).with_transaction(5);
249        let encoded = layer.encode(&adu);
250        let decoded = layer.decode(&encoded).unwrap();
251        assert_eq!(decoded.adu.transaction, Some(5));
252        assert_eq!(decoded.adu.unit, 1);
253        assert_eq!(decoded.adu.fc, 0x03);
254        assert_eq!(decoded.adu.data, vec![0x00, 0x00, 0x00, 0x0a]);
255        layer.destroy().await;
256    }
257
258    #[tokio::test]
259    async fn test_rtu_encode() {
260        let layer = make_rtu_app();
261        let adu = ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x00, 0x00, 0x0a]);
262        let frame = layer.encode(&adu);
263        assert_eq!(frame.len(), 8);
264        assert_eq!(frame[0], 1);
265        assert_eq!(frame[1], 0x03);
266        assert_eq!(&frame[2..6], [0x00, 0x00, 0x00, 0x0a]);
267        let crc_val = u16::from_le_bytes([frame[6], frame[7]]);
268        assert_eq!(crate::utils::crc(&frame[..6]), crc_val);
269        layer.destroy().await;
270    }
271
272    #[tokio::test]
273    async fn test_rtu_decode() {
274        let layer = make_rtu_app();
275        let adu = ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x00, 0x00, 0x0a]);
276        let frame = layer.encode(&adu);
277        let decoded = layer.decode(&frame).unwrap();
278        assert_eq!(decoded.adu.unit, 1);
279        assert_eq!(decoded.adu.fc, 0x03);
280        assert_eq!(decoded.adu.data, vec![0x00, 0x00, 0x00, 0x0a]);
281        layer.destroy().await;
282    }
283
284    #[tokio::test]
285    async fn test_rtu_decode_crc_fail() {
286        let layer = make_rtu_app();
287        let frame = vec![0x01, 0x03, 0x00, 0x00, 0x00, 0x0a, 0xFF, 0xFF];
288        assert!(matches!(
289            layer.decode(&frame),
290            Err(ModbusError::CrcCheckFailed)
291        ));
292        layer.destroy().await;
293    }
294
295    #[tokio::test]
296    async fn test_rtu_roundtrip() {
297        let layer = make_rtu_app();
298        let adu = ApplicationDataUnit::new(
299            17,
300            0x10,
301            vec![0x00, 0x01, 0x00, 0x02, 0x04, 0xAB, 0xCD, 0xEF, 0x01],
302        );
303        let encoded = layer.encode(&adu);
304        let decoded = layer.decode(&encoded).unwrap();
305        assert_eq!(decoded.adu.unit, 17);
306        assert_eq!(decoded.adu.fc, 0x10);
307        assert_eq!(decoded.adu.data, adu.data);
308        layer.destroy().await;
309    }
310
311    #[tokio::test]
312    async fn test_ascii_encode() {
313        let layer = make_ascii_app();
314        let adu = ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x00, 0x00, 0x0a]);
315        let frame = layer.encode(&adu);
316        let frame_str = String::from_utf8(frame.clone()).unwrap();
317        assert!(frame_str.starts_with(':'));
318        assert!(frame_str.ends_with("\r\n"));
319        assert_eq!(frame_str.len(), 1 + 2 + 2 + 8 + 2 + 2);
320        layer.destroy().await;
321    }
322
323    #[tokio::test]
324    async fn test_ascii_decode() {
325        let layer = make_ascii_app();
326        let adu = ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x00, 0x00, 0x0a]);
327        let encoded = layer.encode(&adu);
328        let decoded = layer.decode(&encoded).unwrap();
329        assert_eq!(decoded.adu.unit, 1);
330        assert_eq!(decoded.adu.fc, 0x03);
331        assert_eq!(decoded.adu.data, vec![0x00, 0x00, 0x00, 0x0a]);
332        layer.destroy().await;
333    }
334
335    #[tokio::test]
336    async fn test_ascii_roundtrip() {
337        let layer = make_ascii_app();
338        let adu = ApplicationDataUnit::new(
339            17,
340            0x10,
341            vec![0x00, 0x01, 0x00, 0x02, 0x04, 0xAB, 0xCD, 0xEF, 0x01],
342        );
343        let encoded = layer.encode(&adu);
344        let decoded = layer.decode(&encoded).unwrap();
345        assert_eq!(decoded.adu.unit, 17);
346        assert_eq!(decoded.adu.fc, 0x10);
347        assert_eq!(decoded.adu.data, adu.data);
348        layer.destroy().await;
349    }
350
351    #[tokio::test]
352    async fn test_ascii_decode_lrc_fail() {
353        let layer = make_ascii_app();
354        let frame = b":01030000000AFF\r\n";
355        assert!(matches!(
356            layer.decode(frame),
357            Err(ModbusError::LrcCheckFailed)
358        ));
359        layer.destroy().await;
360    }
361
362    // ===== framing event end-to-end (TCP) =====
363
364    #[tokio::test]
365    async fn test_framing_emits_on_valid_tcp_frame() {
366        let server = TcpServerPhysicalLayer::new();
367        server.set_addr("127.0.0.1:0".to_string()).await;
368        server.open(None).await.unwrap();
369        let application = TcpApplicationLayer::new(server.clone());
370
371        // Bring up a peer client to push bytes at the server.
372        let client = TcpClientPhysicalLayer::new();
373        client.set_addr(server.get_addr().await.unwrap()).await;
374        client.open(None).await.unwrap();
375        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
376
377        let mut framing_rx = application.subscribe_framing();
378
379        let frame = vec![0x00, 0x07, 0x00, 0x00, 0x00, 0x04, 0x01, 0x03, 0x00, 0x0a];
380        client.write(&frame).await.unwrap();
381
382        let f = tokio::time::timeout(tokio::time::Duration::from_secs(2), framing_rx.recv())
383            .await
384            .expect("framing event within 2s")
385            .expect("framing channel still open");
386        assert_eq!(f.adu.transaction, Some(7));
387        assert_eq!(f.adu.unit, 1);
388        assert_eq!(f.adu.fc, 0x03);
389        assert_eq!(f.adu.data, vec![0x00, 0x0a]);
390
391        client.destroy().await;
392        server.destroy().await;
393        application.destroy().await;
394    }
395
396    #[tokio::test]
397    async fn test_framing_error_on_invalid_tcp_protocol() {
398        let server = TcpServerPhysicalLayer::new();
399        server.set_addr("127.0.0.1:0".to_string()).await;
400        server.open(None).await.unwrap();
401        let application = TcpApplicationLayer::new(server.clone());
402
403        let client = TcpClientPhysicalLayer::new();
404        client.set_addr(server.get_addr().await.unwrap()).await;
405        client.open(None).await.unwrap();
406        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
407
408        let mut err_rx = application.subscribe_framing_error();
409
410        // Bogus protocol_id (bytes 2..4 != 0).
411        let bogus = vec![0x00, 0x07, 0x12, 0x34, 0x00, 0x04, 0x01, 0x03, 0x00, 0x0a];
412        client.write(&bogus).await.unwrap();
413
414        let err = tokio::time::timeout(tokio::time::Duration::from_secs(2), err_rx.recv())
415            .await
416            .expect("framing_error event within 2s")
417            .expect("framing_error channel still open");
418        assert!(matches!(err, ModbusError::InvalidData));
419
420        client.destroy().await;
421        server.destroy().await;
422        application.destroy().await;
423    }
424}