rs_modbus/layers/physical/
mod.rs1use crate::error::ModbusError;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use tokio::sync::broadcast;
6
7pub type ConnectionId = Arc<str>;
10
11#[derive(Clone, Copy, Debug, PartialEq, Eq)]
15pub enum PhysicalLayerType {
16 Serial,
17 Net,
18}
19
20pub type ResponseFn = Arc<
24 dyn Fn(Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), ModbusError>> + Send>> + Send + Sync,
25>;
26
27#[derive(Clone)]
31pub struct DataEvent {
32 pub data: Vec<u8>,
33 pub response: ResponseFn,
34 pub connection: ConnectionId,
35}
36
37#[async_trait::async_trait]
38pub trait PhysicalLayer: Send + Sync {
39 type OpenOptions: Default + Send + Sync;
42
43 fn layer_type(&self) -> PhysicalLayerType;
46
47 async fn open(&self, options: Self::OpenOptions) -> Result<(), ModbusError>;
48 async fn write(&self, data: &[u8]) -> Result<(), ModbusError>;
49 async fn close(&self) -> Result<(), ModbusError>;
50 async fn destroy(&self);
51
52 fn is_open(&self) -> bool;
53 fn is_destroyed(&self) -> bool;
54
55 fn subscribe_data(&self) -> broadcast::Receiver<DataEvent>;
58
59 fn subscribe_write(&self) -> broadcast::Receiver<Vec<u8>>;
61
62 fn subscribe_error(&self) -> broadcast::Receiver<ModbusError>;
63
64 fn subscribe_connection_close(&self) -> broadcast::Receiver<ConnectionId>;
69
70 fn subscribe_close(&self) -> broadcast::Receiver<()>;
71}
72
73mod tcp_client;
74mod tcp_server;
75mod udp;
76
77pub use tcp_client::TcpClientPhysicalLayer;
78pub use tcp_server::TcpServerPhysicalLayer;
79pub use udp::{UdpPhysicalLayer, UdpPhysicalLayerOptions};
80
81#[cfg(feature = "serial")]
82mod serial;
83#[cfg(feature = "serial")]
84pub use serial::{SerialPhysicalLayer, SerialPhysicalLayerOptions};
85#[cfg(feature = "serial")]
86pub use serialport::{DataBits, FlowControl, Parity, StopBits};
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93
94 #[test]
97 fn test_physical_layer_type_equality() {
98 assert_eq!(PhysicalLayerType::Serial, PhysicalLayerType::Serial);
99 assert_ne!(PhysicalLayerType::Serial, PhysicalLayerType::Net);
100 }
101
102 #[test]
103 fn test_data_event_clone_preserves_fields() {
104 let response: ResponseFn = Arc::new(|_| Box::pin(async { Ok(()) }));
105 let conn: ConnectionId = Arc::from("test-conn-1");
106 let event = DataEvent {
107 data: vec![1, 2, 3],
108 response: Arc::clone(&response),
109 connection: Arc::clone(&conn),
110 };
111 let cloned = event.clone();
112 assert_eq!(cloned.data, vec![1, 2, 3]);
113 assert_eq!(&*cloned.connection, "test-conn-1");
114 }
115
116 #[test]
117 fn test_connection_id_is_cheap_to_clone() {
118 let id: ConnectionId = Arc::from("hello");
120 let cloned = Arc::clone(&id);
121 assert_eq!(&*id, "hello");
122 assert_eq!(&*cloned, "hello");
123 assert!(Arc::ptr_eq(&id, &cloned));
125 }
126
127 #[tokio::test]
128 async fn test_tcp_client_server_communication() {
129 let server = TcpServerPhysicalLayer::new();
130 server.set_addr("127.0.0.1:0".to_string()).await;
131 server.open(None).await.unwrap();
132 assert_eq!(server.layer_type(), PhysicalLayerType::Net);
133
134 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
135
136 let client = TcpClientPhysicalLayer::new();
137 client.set_addr(server.get_addr().await.unwrap()).await;
138 client.open(None).await.unwrap();
139 assert_eq!(client.layer_type(), PhysicalLayerType::Net);
140
141 let mut server_rx = server.subscribe_data();
142 let mut client_rx = client.subscribe_data();
143 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
144
145 let test_data = vec![0x01, 0x03, 0x00, 0x00, 0x00, 0x0a];
146 client.write(&test_data).await.unwrap();
147
148 let event = tokio::time::timeout(tokio::time::Duration::from_secs(2), server_rx.recv())
149 .await
150 .unwrap()
151 .unwrap();
152 assert_eq!(event.data, test_data);
153 assert!(
154 !event.connection.is_empty(),
155 "server should issue a connection id"
156 );
157
158 let response_data = vec![0x01, 0x03, 0x02, 0x00, 0x0a];
159 (event.response)(response_data.clone()).await.unwrap();
160
161 let client_event =
162 tokio::time::timeout(tokio::time::Duration::from_secs(2), client_rx.recv())
163 .await
164 .unwrap()
165 .unwrap();
166 assert_eq!(client_event.data, response_data);
167
168 client.destroy().await;
169 server.destroy().await;
170 }
171
172 #[tokio::test]
173 async fn test_udp_communication() {
174 let server = UdpPhysicalLayer::new_server();
175 *server.local_addr.lock().await = Some("127.0.0.1:0".to_string());
176 server.open(None).await.unwrap();
177 assert_eq!(server.layer_type(), PhysicalLayerType::Net);
178
179 let server_addr = {
180 let socket = server.socket.lock().await;
181 socket.as_ref().unwrap().local_addr().unwrap()
182 };
183
184 let client = UdpPhysicalLayer::new_client(server_addr.to_string());
185 *client.local_addr.lock().await = Some("127.0.0.1:0".to_string());
186 client.open(None).await.unwrap();
187
188 let mut server_rx = server.subscribe_data();
189 let mut client_rx = client.subscribe_data();
190 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
191
192 let test_data = vec![0x01, 0x03, 0x00, 0x00, 0x00, 0x0a];
193 client.write(&test_data).await.unwrap();
194
195 let event = tokio::time::timeout(tokio::time::Duration::from_secs(2), server_rx.recv())
196 .await
197 .unwrap()
198 .unwrap();
199 assert_eq!(event.data, test_data);
200
201 let response_data = vec![0x01, 0x03, 0x02, 0x00, 0x0a];
202 (event.response)(response_data.clone()).await.unwrap();
203
204 let client_event =
205 tokio::time::timeout(tokio::time::Duration::from_secs(2), client_rx.recv())
206 .await
207 .unwrap()
208 .unwrap();
209 assert_eq!(client_event.data, response_data);
210
211 client.destroy().await;
212 server.destroy().await;
213 }
214
215 #[tokio::test]
216 async fn test_tcp_server_emits_connection_close() {
217 let server = TcpServerPhysicalLayer::new();
218 server.set_addr("127.0.0.1:0".to_string()).await;
219 server.open(None).await.unwrap();
220
221 let mut close_rx = server.subscribe_connection_close();
222
223 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
224
225 let client = TcpClientPhysicalLayer::new();
226 client.set_addr(server.get_addr().await.unwrap()).await;
227 client.open(None).await.unwrap();
228 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
229
230 client.destroy().await;
231
232 let closed_id = tokio::time::timeout(tokio::time::Duration::from_secs(2), close_rx.recv())
233 .await
234 .expect("should receive connection_close within 2s")
235 .expect("subscribe_connection_close should yield an id");
236 assert!(!closed_id.is_empty());
237
238 server.destroy().await;
239 }
240
241 #[tokio::test]
242 async fn test_write_before_open_fails() {
243 let client = TcpClientPhysicalLayer::new();
244 let result = client.write(&[0x01]).await;
245 assert!(matches!(result, Err(ModbusError::PortNotOpen)));
246 }
247
248 #[tokio::test]
249 async fn test_server_write_not_supported() {
250 let server = TcpServerPhysicalLayer::new();
251 let result = server.write(&[0x01]).await;
252 assert!(matches!(result, Err(ModbusError::NotSupported)));
253 }
254}