1use crate::error::ModbusError;
2use crate::layers::physical::{ConnectionId, ResponseFn};
3use crate::types::{ApplicationDataUnit, CustomFunctionCode, FramedDataUnit};
4use tokio::sync::broadcast;
5
6#[derive(Clone, Copy, Debug, PartialEq, Eq)]
12pub enum ApplicationRole {
13 Master,
14 Slave,
15}
16
17pub(crate) fn set_role_impl(
22 current: &mut Option<ApplicationRole>,
23 role: ApplicationRole,
24) -> Result<(), ModbusError> {
25 match *current {
26 Some(existing) if existing == role => Ok(()),
27 Some(existing) => Err(ModbusError::InvalidState(format!(
28 "application layer role already set to {existing:?}, cannot change to {role:?}"
29 ))),
30 None => {
31 *current = Some(role);
32 Ok(())
33 }
34 }
35}
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub enum ApplicationProtocol {
43 Tcp,
44 Rtu,
45 Ascii,
46}
47
48#[derive(Clone)]
52pub struct Framing {
53 pub adu: ApplicationDataUnit,
54 pub raw: Vec<u8>,
55 pub response: ResponseFn,
56 pub connection: ConnectionId,
57}
58
59#[async_trait::async_trait]
60pub trait ApplicationLayer: Send + Sync {
61 fn set_role(&self, role: ApplicationRole) -> Result<(), ModbusError>;
65
66 fn role(&self) -> Option<ApplicationRole>;
68
69 fn protocol(&self) -> ApplicationProtocol;
72
73 fn encode(&self, adu: &ApplicationDataUnit) -> Vec<u8>;
76
77 fn decode(&self, data: &[u8]) -> Result<FramedDataUnit, ModbusError>;
81
82 fn flush(&self);
85
86 fn subscribe_framing(&self) -> broadcast::Receiver<Framing>;
89
90 fn subscribe_framing_error(&self) -> broadcast::Receiver<ModbusError>;
93
94 fn add_custom_function_code(&self, _cfc: CustomFunctionCode) {}
97
98 fn remove_custom_function_code(&self, _fc: u8) {}
100
101 async fn destroy(&self);
103}
104
105mod ascii;
106mod rtu;
107mod tcp;
108
109pub use ascii::{AsciiApplicationLayer, AsciiApplicationLayerOptions};
110pub use rtu::{FrameInterval, RtuApplicationLayer, RtuApplicationLayerOptions};
111pub use tcp::TcpApplicationLayer;
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116 use crate::layers::physical::{PhysicalLayer, TcpClientPhysicalLayer, TcpServerPhysicalLayer};
117 use std::sync::Arc;
118
119 #[test]
122 fn test_application_role_equality() {
123 assert_eq!(ApplicationRole::Master, ApplicationRole::Master);
124 assert_ne!(ApplicationRole::Master, ApplicationRole::Slave);
125 }
126
127 #[test]
128 fn test_framing_clone_preserves_fields() {
129 use crate::layers::physical::{ConnectionId, ResponseFn};
130
131 let response: ResponseFn = Arc::new(|_| Box::pin(async { Ok(()) }));
132 let conn: ConnectionId = Arc::from("test");
133 let framing = Framing {
134 adu: ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x0a]),
135 raw: vec![0xff; 4],
136 response,
137 connection: conn,
138 };
139 let cloned = framing.clone();
140 assert_eq!(cloned.adu.unit, 1);
141 assert_eq!(cloned.adu.fc, 0x03);
142 assert_eq!(cloned.raw, vec![0xff; 4]);
143 assert_eq!(&*cloned.connection, "test");
144 }
145
146 fn make_tcp_app() -> Arc<TcpApplicationLayer> {
149 let physical = TcpServerPhysicalLayer::new();
154 TcpApplicationLayer::new(physical)
155 }
156
157 fn make_rtu_app() -> Arc<RtuApplicationLayer> {
158 let physical = TcpClientPhysicalLayer::new();
161 RtuApplicationLayer::new(physical, RtuApplicationLayerOptions::default())
162 }
163
164 fn make_ascii_app() -> Arc<AsciiApplicationLayer> {
165 let physical = TcpServerPhysicalLayer::new();
166 AsciiApplicationLayer::new(physical)
167 }
168
169 #[tokio::test]
170 async fn test_set_role_first_call_succeeds() {
171 let app = make_tcp_app();
172 assert_eq!(app.role(), None);
173 app.set_role(ApplicationRole::Master).unwrap();
174 assert_eq!(app.role(), Some(ApplicationRole::Master));
175 app.destroy().await;
176 }
177
178 #[tokio::test]
179 async fn test_set_role_same_again_is_idempotent() {
180 let app = make_tcp_app();
181 app.set_role(ApplicationRole::Slave).unwrap();
182 app.set_role(ApplicationRole::Slave).unwrap();
183 assert_eq!(app.role(), Some(ApplicationRole::Slave));
184 app.destroy().await;
185 }
186
187 #[tokio::test]
188 async fn test_set_role_conflict_returns_invalid_state() {
189 let app = make_tcp_app();
190 app.set_role(ApplicationRole::Master).unwrap();
191 let err = app.set_role(ApplicationRole::Slave).unwrap_err();
192 assert!(matches!(err, ModbusError::InvalidState(_)));
193 app.destroy().await;
194 }
195
196 #[tokio::test]
199 async fn test_tcp_encode() {
200 let layer = make_tcp_app();
201 let adu = ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x00, 0x00, 0x0a]);
202 let frame = layer.encode(&adu);
203 assert_eq!(frame.len(), 12);
204 assert_eq!(&frame[2..4], [0x00, 0x00]); assert_eq!(u16::from_be_bytes([frame[4], frame[5]]), 6); assert_eq!(frame[6], 1); assert_eq!(frame[7], 0x03); assert_eq!(&frame[8..], [0x00, 0x00, 0x00, 0x0a]);
209 layer.destroy().await;
210 }
211
212 #[tokio::test]
213 async fn test_tcp_encode_with_transaction() {
214 let layer = make_tcp_app();
215 let adu = ApplicationDataUnit::new(1, 0x03, vec![]).with_transaction(42);
216 let frame = layer.encode(&adu);
217 assert_eq!(u16::from_be_bytes([frame[0], frame[1]]), 42);
218 layer.destroy().await;
219 }
220
221 #[tokio::test]
222 async fn test_tcp_decode() {
223 let layer = make_tcp_app();
224 let frame = vec![0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x01, 0x03, 0x00, 0x0a];
225 let decoded = layer.decode(&frame).unwrap();
226 assert_eq!(decoded.adu.transaction, Some(1));
227 assert_eq!(decoded.adu.unit, 1);
228 assert_eq!(decoded.adu.fc, 0x03);
229 assert_eq!(decoded.adu.data, vec![0x00, 0x0a]);
230 layer.destroy().await;
231 }
232
233 #[tokio::test]
234 async fn test_tcp_decode_invalid_protocol() {
235 let layer = make_tcp_app();
236 let frame = vec![0x00, 0x01, 0x00, 0x01, 0x00, 0x04, 0x01, 0x03, 0x00, 0x0a];
237 assert!(matches!(
238 layer.decode(&frame),
239 Err(ModbusError::InvalidData)
240 ));
241 layer.destroy().await;
242 }
243
244 #[tokio::test]
245 async fn test_tcp_roundtrip() {
246 let layer = make_tcp_app();
247 let adu =
248 ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x00, 0x00, 0x0a]).with_transaction(5);
249 let encoded = layer.encode(&adu);
250 let decoded = layer.decode(&encoded).unwrap();
251 assert_eq!(decoded.adu.transaction, Some(5));
252 assert_eq!(decoded.adu.unit, 1);
253 assert_eq!(decoded.adu.fc, 0x03);
254 assert_eq!(decoded.adu.data, vec![0x00, 0x00, 0x00, 0x0a]);
255 layer.destroy().await;
256 }
257
258 #[tokio::test]
259 async fn test_rtu_encode() {
260 let layer = make_rtu_app();
261 let adu = ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x00, 0x00, 0x0a]);
262 let frame = layer.encode(&adu);
263 assert_eq!(frame.len(), 8);
264 assert_eq!(frame[0], 1);
265 assert_eq!(frame[1], 0x03);
266 assert_eq!(&frame[2..6], [0x00, 0x00, 0x00, 0x0a]);
267 let crc_val = u16::from_le_bytes([frame[6], frame[7]]);
268 assert_eq!(crate::utils::crc(&frame[..6]), crc_val);
269 layer.destroy().await;
270 }
271
272 #[tokio::test]
273 async fn test_rtu_decode() {
274 let layer = make_rtu_app();
275 let adu = ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x00, 0x00, 0x0a]);
276 let frame = layer.encode(&adu);
277 let decoded = layer.decode(&frame).unwrap();
278 assert_eq!(decoded.adu.unit, 1);
279 assert_eq!(decoded.adu.fc, 0x03);
280 assert_eq!(decoded.adu.data, vec![0x00, 0x00, 0x00, 0x0a]);
281 layer.destroy().await;
282 }
283
284 #[tokio::test]
285 async fn test_rtu_decode_crc_fail() {
286 let layer = make_rtu_app();
287 let frame = vec![0x01, 0x03, 0x00, 0x00, 0x00, 0x0a, 0xFF, 0xFF];
288 assert!(matches!(
289 layer.decode(&frame),
290 Err(ModbusError::CrcCheckFailed)
291 ));
292 layer.destroy().await;
293 }
294
295 #[tokio::test]
296 async fn test_rtu_roundtrip() {
297 let layer = make_rtu_app();
298 let adu = ApplicationDataUnit::new(
299 17,
300 0x10,
301 vec![0x00, 0x01, 0x00, 0x02, 0x04, 0xAB, 0xCD, 0xEF, 0x01],
302 );
303 let encoded = layer.encode(&adu);
304 let decoded = layer.decode(&encoded).unwrap();
305 assert_eq!(decoded.adu.unit, 17);
306 assert_eq!(decoded.adu.fc, 0x10);
307 assert_eq!(decoded.adu.data, adu.data);
308 layer.destroy().await;
309 }
310
311 #[tokio::test]
312 async fn test_ascii_encode() {
313 let layer = make_ascii_app();
314 let adu = ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x00, 0x00, 0x0a]);
315 let frame = layer.encode(&adu);
316 let frame_str = String::from_utf8(frame.clone()).unwrap();
317 assert!(frame_str.starts_with(':'));
318 assert!(frame_str.ends_with("\r\n"));
319 assert_eq!(frame_str.len(), 1 + 2 + 2 + 8 + 2 + 2);
320 layer.destroy().await;
321 }
322
323 #[tokio::test]
324 async fn test_ascii_decode() {
325 let layer = make_ascii_app();
326 let adu = ApplicationDataUnit::new(1, 0x03, vec![0x00, 0x00, 0x00, 0x0a]);
327 let encoded = layer.encode(&adu);
328 let decoded = layer.decode(&encoded).unwrap();
329 assert_eq!(decoded.adu.unit, 1);
330 assert_eq!(decoded.adu.fc, 0x03);
331 assert_eq!(decoded.adu.data, vec![0x00, 0x00, 0x00, 0x0a]);
332 layer.destroy().await;
333 }
334
335 #[tokio::test]
336 async fn test_ascii_roundtrip() {
337 let layer = make_ascii_app();
338 let adu = ApplicationDataUnit::new(
339 17,
340 0x10,
341 vec![0x00, 0x01, 0x00, 0x02, 0x04, 0xAB, 0xCD, 0xEF, 0x01],
342 );
343 let encoded = layer.encode(&adu);
344 let decoded = layer.decode(&encoded).unwrap();
345 assert_eq!(decoded.adu.unit, 17);
346 assert_eq!(decoded.adu.fc, 0x10);
347 assert_eq!(decoded.adu.data, adu.data);
348 layer.destroy().await;
349 }
350
351 #[tokio::test]
352 async fn test_ascii_decode_lrc_fail() {
353 let layer = make_ascii_app();
354 let frame = b":01030000000AFF\r\n";
355 assert!(matches!(
356 layer.decode(frame),
357 Err(ModbusError::LrcCheckFailed)
358 ));
359 layer.destroy().await;
360 }
361
362 #[tokio::test]
365 async fn test_framing_emits_on_valid_tcp_frame() {
366 let server = TcpServerPhysicalLayer::new();
367 server.set_addr("127.0.0.1:0".to_string()).await;
368 server.open(None).await.unwrap();
369 let application = TcpApplicationLayer::new(server.clone());
370
371 let client = TcpClientPhysicalLayer::new();
373 client.set_addr(server.get_addr().await.unwrap()).await;
374 client.open(None).await.unwrap();
375 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
376
377 let mut framing_rx = application.subscribe_framing();
378
379 let frame = vec![0x00, 0x07, 0x00, 0x00, 0x00, 0x04, 0x01, 0x03, 0x00, 0x0a];
380 client.write(&frame).await.unwrap();
381
382 let f = tokio::time::timeout(tokio::time::Duration::from_secs(2), framing_rx.recv())
383 .await
384 .expect("framing event within 2s")
385 .expect("framing channel still open");
386 assert_eq!(f.adu.transaction, Some(7));
387 assert_eq!(f.adu.unit, 1);
388 assert_eq!(f.adu.fc, 0x03);
389 assert_eq!(f.adu.data, vec![0x00, 0x0a]);
390
391 client.destroy().await;
392 server.destroy().await;
393 application.destroy().await;
394 }
395
396 #[tokio::test]
397 async fn test_framing_error_on_invalid_tcp_protocol() {
398 let server = TcpServerPhysicalLayer::new();
399 server.set_addr("127.0.0.1:0".to_string()).await;
400 server.open(None).await.unwrap();
401 let application = TcpApplicationLayer::new(server.clone());
402
403 let client = TcpClientPhysicalLayer::new();
404 client.set_addr(server.get_addr().await.unwrap()).await;
405 client.open(None).await.unwrap();
406 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
407
408 let mut err_rx = application.subscribe_framing_error();
409
410 let bogus = vec![0x00, 0x07, 0x12, 0x34, 0x00, 0x04, 0x01, 0x03, 0x00, 0x0a];
412 client.write(&bogus).await.unwrap();
413
414 let err = tokio::time::timeout(tokio::time::Duration::from_secs(2), err_rx.recv())
415 .await
416 .expect("framing_error event within 2s")
417 .expect("framing_error channel still open");
418 assert!(matches!(err, ModbusError::InvalidData));
419
420 client.destroy().await;
421 server.destroy().await;
422 application.destroy().await;
423 }
424}