Skip to main content

rs_modbus/layers/physical/
mod.rs

1use crate::error::ModbusError;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use tokio::sync::broadcast;
6
7/// Identifies a single connection (socket / serial port / udp peer) within a
8/// physical layer. Cheaply cloneable via `Arc`.
9pub type ConnectionId = Arc<str>;
10
11/// Differentiates serial transports (where RTU needs 3.5T inter-frame timing)
12/// from network transports (where TCP/UDP delivery boundaries are already
13/// message-aligned in practice).
14#[derive(Clone, Copy, Debug, PartialEq, Eq)]
15pub enum PhysicalLayerType {
16    Serial,
17    Net,
18}
19
20/// Per-message reply closure. The physical layer hands this to upper layers so
21/// they can write a response back to the originating connection without
22/// exposing connection-id details.
23pub type ResponseFn = Arc<
24    dyn Fn(Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), ModbusError>> + Send>> + Send + Sync,
25>;
26
27/// Payload of the `subscribe_data` broadcast channel. Carries the bytes that
28/// arrived, the reply closure for the originating connection, and the
29/// connection identifier so upper layers can demultiplex.
30#[derive(Clone)]
31pub struct DataEvent {
32    pub data: Vec<u8>,
33    pub response: ResponseFn,
34    pub connection: ConnectionId,
35}
36
37#[async_trait::async_trait]
38pub trait PhysicalLayer: Send + Sync {
39    /// Options forwarded to [`PhysicalLayer::open`] from `ModbusMaster::open` /
40    /// `ModbusSlave::open`. Mirrors njs-modbus `open(...args)` semantics.
41    type OpenOptions: Default + Send + Sync;
42
43    /// Distinguishes serial vs network transports. Used by the RTU
44    /// application layer to decide whether to apply 3.5T inter-frame timing.
45    fn layer_type(&self) -> PhysicalLayerType;
46
47    async fn open(&self, options: Self::OpenOptions) -> Result<(), ModbusError>;
48    async fn write(&self, data: &[u8]) -> Result<(), ModbusError>;
49    async fn close(&self) -> Result<(), ModbusError>;
50    async fn destroy(&self);
51
52    fn is_open(&self) -> bool;
53    fn is_destroyed(&self) -> bool;
54
55    /// Subscribe to incoming bytes from any connection. Each event carries the
56    /// connection id so upper layers can demultiplex.
57    fn subscribe_data(&self) -> broadcast::Receiver<DataEvent>;
58
59    /// Subscribe to outgoing bytes written via [`write`]. Useful for logging.
60    fn subscribe_write(&self) -> broadcast::Receiver<Vec<u8>>;
61
62    fn subscribe_error(&self) -> broadcast::Receiver<ModbusError>;
63
64    /// Subscribe to individual connection-level close events (a single socket
65    /// disconnecting in a TCP server, the serial port closing, etc.). Separate
66    /// from `subscribe_close` which fires when the whole physical layer shuts
67    /// down.
68    fn subscribe_connection_close(&self) -> broadcast::Receiver<ConnectionId>;
69
70    fn subscribe_close(&self) -> broadcast::Receiver<()>;
71}
72
73mod tcp_client;
74mod tcp_server;
75mod udp;
76
77pub use tcp_client::TcpClientPhysicalLayer;
78pub use tcp_server::TcpServerPhysicalLayer;
79pub use udp::{UdpPhysicalLayer, UdpPhysicalLayerOptions};
80
81#[cfg(feature = "serial")]
82mod serial;
83#[cfg(feature = "serial")]
84pub use serial::{SerialPhysicalLayer, SerialPhysicalLayerOptions};
85#[cfg(feature = "serial")]
86/// Re-exported so callers can set data/stop bits, parity, and flow control
87/// without adding `serialport` as a direct dependency.
88pub use serialport::{DataBits, FlowControl, Parity, StopBits};
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93
94    // ===== Base types =====
95
96    #[test]
97    fn test_physical_layer_type_equality() {
98        assert_eq!(PhysicalLayerType::Serial, PhysicalLayerType::Serial);
99        assert_ne!(PhysicalLayerType::Serial, PhysicalLayerType::Net);
100    }
101
102    #[test]
103    fn test_data_event_clone_preserves_fields() {
104        let response: ResponseFn = Arc::new(|_| Box::pin(async { Ok(()) }));
105        let conn: ConnectionId = Arc::from("test-conn-1");
106        let event = DataEvent {
107            data: vec![1, 2, 3],
108            response: Arc::clone(&response),
109            connection: Arc::clone(&conn),
110        };
111        let cloned = event.clone();
112        assert_eq!(cloned.data, vec![1, 2, 3]);
113        assert_eq!(&*cloned.connection, "test-conn-1");
114    }
115
116    #[test]
117    fn test_connection_id_is_cheap_to_clone() {
118        // ConnectionId should be Arc<str> so clone is O(1)
119        let id: ConnectionId = Arc::from("hello");
120        let cloned = Arc::clone(&id);
121        assert_eq!(&*id, "hello");
122        assert_eq!(&*cloned, "hello");
123        // Both should point to same allocation
124        assert!(Arc::ptr_eq(&id, &cloned));
125    }
126
127    #[tokio::test]
128    async fn test_tcp_client_server_communication() {
129        let server = TcpServerPhysicalLayer::new();
130        server.set_addr("127.0.0.1:0".to_string()).await;
131        server.open(None).await.unwrap();
132        assert_eq!(server.layer_type(), PhysicalLayerType::Net);
133
134        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
135
136        let client = TcpClientPhysicalLayer::new();
137        client.set_addr(server.get_addr().await.unwrap()).await;
138        client.open(None).await.unwrap();
139        assert_eq!(client.layer_type(), PhysicalLayerType::Net);
140
141        let mut server_rx = server.subscribe_data();
142        let mut client_rx = client.subscribe_data();
143        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
144
145        let test_data = vec![0x01, 0x03, 0x00, 0x00, 0x00, 0x0a];
146        client.write(&test_data).await.unwrap();
147
148        let event = tokio::time::timeout(tokio::time::Duration::from_secs(2), server_rx.recv())
149            .await
150            .unwrap()
151            .unwrap();
152        assert_eq!(event.data, test_data);
153        assert!(
154            !event.connection.is_empty(),
155            "server should issue a connection id"
156        );
157
158        let response_data = vec![0x01, 0x03, 0x02, 0x00, 0x0a];
159        (event.response)(response_data.clone()).await.unwrap();
160
161        let client_event =
162            tokio::time::timeout(tokio::time::Duration::from_secs(2), client_rx.recv())
163                .await
164                .unwrap()
165                .unwrap();
166        assert_eq!(client_event.data, response_data);
167
168        client.destroy().await;
169        server.destroy().await;
170    }
171
172    #[tokio::test]
173    async fn test_udp_communication() {
174        let server = UdpPhysicalLayer::new_server();
175        *server.local_addr.lock().await = Some("127.0.0.1:0".to_string());
176        server.open(None).await.unwrap();
177        assert_eq!(server.layer_type(), PhysicalLayerType::Net);
178
179        let server_addr = {
180            let socket = server.socket.lock().await;
181            socket.as_ref().unwrap().local_addr().unwrap()
182        };
183
184        let client = UdpPhysicalLayer::new_client(server_addr.to_string());
185        *client.local_addr.lock().await = Some("127.0.0.1:0".to_string());
186        client.open(None).await.unwrap();
187
188        let mut server_rx = server.subscribe_data();
189        let mut client_rx = client.subscribe_data();
190        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
191
192        let test_data = vec![0x01, 0x03, 0x00, 0x00, 0x00, 0x0a];
193        client.write(&test_data).await.unwrap();
194
195        let event = tokio::time::timeout(tokio::time::Duration::from_secs(2), server_rx.recv())
196            .await
197            .unwrap()
198            .unwrap();
199        assert_eq!(event.data, test_data);
200
201        let response_data = vec![0x01, 0x03, 0x02, 0x00, 0x0a];
202        (event.response)(response_data.clone()).await.unwrap();
203
204        let client_event =
205            tokio::time::timeout(tokio::time::Duration::from_secs(2), client_rx.recv())
206                .await
207                .unwrap()
208                .unwrap();
209        assert_eq!(client_event.data, response_data);
210
211        client.destroy().await;
212        server.destroy().await;
213    }
214
215    #[tokio::test]
216    async fn test_tcp_server_emits_connection_close() {
217        let server = TcpServerPhysicalLayer::new();
218        server.set_addr("127.0.0.1:0".to_string()).await;
219        server.open(None).await.unwrap();
220
221        let mut close_rx = server.subscribe_connection_close();
222
223        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
224
225        let client = TcpClientPhysicalLayer::new();
226        client.set_addr(server.get_addr().await.unwrap()).await;
227        client.open(None).await.unwrap();
228        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
229
230        client.destroy().await;
231
232        let closed_id = tokio::time::timeout(tokio::time::Duration::from_secs(2), close_rx.recv())
233            .await
234            .expect("should receive connection_close within 2s")
235            .expect("subscribe_connection_close should yield an id");
236        assert!(!closed_id.is_empty());
237
238        server.destroy().await;
239    }
240
241    #[tokio::test]
242    async fn test_write_before_open_fails() {
243        let client = TcpClientPhysicalLayer::new();
244        let result = client.write(&[0x01]).await;
245        assert!(matches!(result, Err(ModbusError::PortNotOpen)));
246    }
247
248    #[tokio::test]
249    async fn test_server_write_not_supported() {
250        let server = TcpServerPhysicalLayer::new();
251        let result = server.write(&[0x01]).await;
252        assert!(matches!(result, Err(ModbusError::NotSupported)));
253    }
254}