1use std::net::SocketAddr;
2use std::sync::Arc;
3use tokio::net::{TcpListener, TcpStream};
4use tokio::sync::Semaphore;
5
6use crate::{
7 dispatch::dispatch_loop, error::Result, handshake::server_handshake, store::DataStore,
8};
9
10#[derive(Debug, Clone)]
12pub struct ServerConfig {
13 pub bind_addr: SocketAddr,
14 pub max_connections: usize,
15}
16
17pub struct S7Server {
19 listener: TcpListener,
20 semaphore: Arc<Semaphore>,
21}
22
23impl S7Server {
24 pub async fn bind(config: ServerConfig) -> Result<Self> {
26 let listener = TcpListener::bind(config.bind_addr).await?;
27 let semaphore = Arc::new(Semaphore::new(config.max_connections));
28 Ok(Self {
29 listener,
30 semaphore,
31 })
32 }
33
34 pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
36 self.listener.local_addr()
37 }
38
39 pub async fn serve(self, store: DataStore) -> Result<()> {
41 loop {
42 let permit = Arc::clone(&self.semaphore)
44 .acquire_owned()
45 .await
46 .expect("semaphore closed");
47
48 let (stream, _peer) = self.listener.accept().await?;
49 let store = store.clone();
50
51 tokio::spawn(async move {
52 let _permit = permit; if let Err(e) = serve_one(stream, store).await {
54 let _ = e; }
58 });
59 }
60 }
61}
62
63async fn serve_one(mut stream: TcpStream, store: DataStore) -> Result<()> {
65 stream.set_nodelay(true)?;
66 let pdu_size = server_handshake(&mut stream).await?;
67 dispatch_loop(&mut stream, pdu_size, store).await
68}
69
70#[cfg(test)]
75mod tests {
76 use std::net::SocketAddr;
77
78 use snap7_client::{types::ConnectParams, S7Client};
79
80 use super::*;
81 use crate::store::DataStore;
82
83 fn make_config() -> ServerConfig {
84 ServerConfig {
85 bind_addr: "127.0.0.1:0".parse::<SocketAddr>().unwrap(),
86 max_connections: 4,
87 }
88 }
89
90 #[tokio::test]
91 async fn server_accepts_s7client_connection() {
92 let store = DataStore::new();
93 store.write_bytes(1, 0, &[0x12, 0x34]);
94
95 let server = S7Server::bind(make_config()).await.unwrap();
96 let addr = server.local_addr().unwrap();
97
98 tokio::spawn(server.serve(store));
99
100 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
101
102 let params = ConnectParams {
103 rack: 0,
104 slot: 1,
105 ..ConnectParams::default()
106 };
107 let client = S7Client::connect(addr, params).await.unwrap();
108 let data = client.db_read(1, 0, 2).await.unwrap();
109 assert_eq!(&data[..], &[0x12, 0x34]);
110 }
111
112 #[tokio::test]
113 async fn server_write_then_read() {
114 let store = DataStore::new();
115
116 let server = S7Server::bind(make_config()).await.unwrap();
117 let addr = server.local_addr().unwrap();
118
119 tokio::spawn(server.serve(store));
120
121 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
122
123 let params = ConnectParams::default();
124 let client = S7Client::connect(addr, params).await.unwrap();
125
126 client.db_write(2, 10, &[0xAB, 0xCD]).await.unwrap();
127 let data = client.db_read(2, 10, 2).await.unwrap();
128 assert_eq!(&data[..], &[0xAB, 0xCD]);
129 }
130}