use crate::error::ModbusError;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast;
pub type ConnectionId = Arc<str>;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PhysicalLayerType {
Serial,
Net,
}
pub type ResponseFn = Arc<
dyn Fn(Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), ModbusError>> + Send>> + Send + Sync,
>;
#[derive(Clone)]
pub struct DataEvent {
pub data: Vec<u8>,
pub response: ResponseFn,
pub connection: ConnectionId,
}
#[async_trait::async_trait]
pub trait PhysicalLayer: Send + Sync {
type OpenOptions: Default + Send + Sync;
fn layer_type(&self) -> PhysicalLayerType;
async fn open(&self, options: Self::OpenOptions) -> Result<(), ModbusError>;
async fn write(&self, data: &[u8]) -> Result<(), ModbusError>;
async fn close(&self) -> Result<(), ModbusError>;
async fn destroy(&self);
fn is_open(&self) -> bool;
fn is_destroyed(&self) -> bool;
fn subscribe_data(&self) -> broadcast::Receiver<DataEvent>;
fn subscribe_write(&self) -> broadcast::Receiver<Vec<u8>>;
fn subscribe_error(&self) -> broadcast::Receiver<ModbusError>;
fn subscribe_connection_close(&self) -> broadcast::Receiver<ConnectionId>;
fn subscribe_close(&self) -> broadcast::Receiver<()>;
}
mod tcp_client;
mod tcp_server;
mod udp;
pub use tcp_client::TcpClientPhysicalLayer;
pub use tcp_server::TcpServerPhysicalLayer;
pub use udp::{UdpPhysicalLayer, UdpPhysicalLayerOptions};
#[cfg(feature = "serial")]
mod serial;
#[cfg(feature = "serial")]
pub use serial::{SerialPhysicalLayer, SerialPhysicalLayerOptions};
#[cfg(feature = "serial")]
pub use serialport::{DataBits, FlowControl, Parity, StopBits};
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_physical_layer_type_equality() {
assert_eq!(PhysicalLayerType::Serial, PhysicalLayerType::Serial);
assert_ne!(PhysicalLayerType::Serial, PhysicalLayerType::Net);
}
#[test]
fn test_data_event_clone_preserves_fields() {
let response: ResponseFn = Arc::new(|_| Box::pin(async { Ok(()) }));
let conn: ConnectionId = Arc::from("test-conn-1");
let event = DataEvent {
data: vec![1, 2, 3],
response: Arc::clone(&response),
connection: Arc::clone(&conn),
};
let cloned = event.clone();
assert_eq!(cloned.data, vec![1, 2, 3]);
assert_eq!(&*cloned.connection, "test-conn-1");
}
#[test]
fn test_connection_id_is_cheap_to_clone() {
let id: ConnectionId = Arc::from("hello");
let cloned = Arc::clone(&id);
assert_eq!(&*id, "hello");
assert_eq!(&*cloned, "hello");
assert!(Arc::ptr_eq(&id, &cloned));
}
#[tokio::test]
async fn test_tcp_client_server_communication() {
let server = TcpServerPhysicalLayer::new();
server.set_addr("127.0.0.1:0".to_string()).await;
server.open(None).await.unwrap();
assert_eq!(server.layer_type(), PhysicalLayerType::Net);
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let client = TcpClientPhysicalLayer::new();
client.set_addr(server.get_addr().await.unwrap()).await;
client.open(None).await.unwrap();
assert_eq!(client.layer_type(), PhysicalLayerType::Net);
let mut server_rx = server.subscribe_data();
let mut client_rx = client.subscribe_data();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let test_data = vec![0x01, 0x03, 0x00, 0x00, 0x00, 0x0a];
client.write(&test_data).await.unwrap();
let event = tokio::time::timeout(tokio::time::Duration::from_secs(2), server_rx.recv())
.await
.unwrap()
.unwrap();
assert_eq!(event.data, test_data);
assert!(
!event.connection.is_empty(),
"server should issue a connection id"
);
let response_data = vec![0x01, 0x03, 0x02, 0x00, 0x0a];
(event.response)(response_data.clone()).await.unwrap();
let client_event =
tokio::time::timeout(tokio::time::Duration::from_secs(2), client_rx.recv())
.await
.unwrap()
.unwrap();
assert_eq!(client_event.data, response_data);
client.destroy().await;
server.destroy().await;
}
#[tokio::test]
async fn test_udp_communication() {
let server = UdpPhysicalLayer::new_server();
*server.local_addr.lock().await = Some("127.0.0.1:0".to_string());
server.open(None).await.unwrap();
assert_eq!(server.layer_type(), PhysicalLayerType::Net);
let server_addr = {
let socket = server.socket.lock().await;
socket.as_ref().unwrap().local_addr().unwrap()
};
let client = UdpPhysicalLayer::new_client(server_addr.to_string());
*client.local_addr.lock().await = Some("127.0.0.1:0".to_string());
client.open(None).await.unwrap();
let mut server_rx = server.subscribe_data();
let mut client_rx = client.subscribe_data();
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let test_data = vec![0x01, 0x03, 0x00, 0x00, 0x00, 0x0a];
client.write(&test_data).await.unwrap();
let event = tokio::time::timeout(tokio::time::Duration::from_secs(2), server_rx.recv())
.await
.unwrap()
.unwrap();
assert_eq!(event.data, test_data);
let response_data = vec![0x01, 0x03, 0x02, 0x00, 0x0a];
(event.response)(response_data.clone()).await.unwrap();
let client_event =
tokio::time::timeout(tokio::time::Duration::from_secs(2), client_rx.recv())
.await
.unwrap()
.unwrap();
assert_eq!(client_event.data, response_data);
client.destroy().await;
server.destroy().await;
}
#[tokio::test]
async fn test_tcp_server_emits_connection_close() {
let server = TcpServerPhysicalLayer::new();
server.set_addr("127.0.0.1:0".to_string()).await;
server.open(None).await.unwrap();
let mut close_rx = server.subscribe_connection_close();
tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
let client = TcpClientPhysicalLayer::new();
client.set_addr(server.get_addr().await.unwrap()).await;
client.open(None).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
client.destroy().await;
let closed_id = tokio::time::timeout(tokio::time::Duration::from_secs(2), close_rx.recv())
.await
.expect("should receive connection_close within 2s")
.expect("subscribe_connection_close should yield an id");
assert!(!closed_id.is_empty());
server.destroy().await;
}
#[tokio::test]
async fn test_write_before_open_fails() {
let client = TcpClientPhysicalLayer::new();
let result = client.write(&[0x01]).await;
assert!(matches!(result, Err(ModbusError::PortNotOpen)));
}
#[tokio::test]
async fn test_server_write_not_supported() {
let server = TcpServerPhysicalLayer::new();
let result = server.write(&[0x01]).await;
assert!(matches!(result, Err(ModbusError::NotSupported)));
}
}