1use std::net::SocketAddr;
2use std::sync::Arc;
3use tokio::net::{TcpListener, TcpStream};
4use tokio::sync::Semaphore;
5
6use crate::{
7 dispatch::dispatch_loop,
8 error::Result,
9 handshake::server_handshake,
10 store::{CpuState, DataStore, EventInfo, ServerStatus},
11};
12
13#[derive(Debug, Clone)]
15pub struct ServerConfig {
16 pub bind_addr: SocketAddr,
17 pub max_connections: usize,
18}
19
20pub struct S7Server {
22 listener: TcpListener,
23 semaphore: Arc<Semaphore>,
24}
25
26impl S7Server {
27 pub async fn bind(config: ServerConfig) -> Result<Self> {
29 let listener = TcpListener::bind(config.bind_addr).await?;
30 let semaphore = Arc::new(Semaphore::new(config.max_connections));
31 Ok(Self { listener, semaphore })
32 }
33
34 pub async fn start_to(addr: &str, max_connections: usize) -> Result<Self> {
36 let bind_addr: SocketAddr = addr.parse().map_err(|e| {
37 crate::error::Error::Io(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
38 })?;
39 Self::bind(ServerConfig { bind_addr, max_connections }).await
40 }
41
42 pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
44 self.listener.local_addr()
45 }
46
47 pub async fn serve(self, store: DataStore) -> Result<()> {
49 loop {
50 let permit = Arc::clone(&self.semaphore)
51 .acquire_owned()
52 .await
53 .expect("semaphore closed");
54
55 let (stream, _peer) = self.listener.accept().await?;
56 let store = store.clone();
57
58 tokio::spawn(async move {
59 let _permit = permit;
60 store.client_connected();
61 let _ = serve_one(stream, store.clone()).await;
62 store.client_disconnected();
63 });
64 }
65 }
66
67 pub fn get_status(store: &DataStore) -> ServerStatus {
71 store.get_status()
72 }
73
74 pub fn set_cpu_status(store: &DataStore, state: CpuState) {
76 store.set_cpu_state(state);
77 }
78
79 pub fn lock_area(store: &DataStore, area_code: u8) {
81 store.lock_area(area_code);
82 }
83
84 pub fn unlock_area(store: &DataStore, area_code: u8) {
86 store.unlock_area(area_code);
87 }
88
89 pub fn clear_events(store: &DataStore) {
91 store.clear_events();
92 }
93
94 pub fn pick_event(store: &DataStore) -> Option<EventInfo> {
96 store.pick_event()
97 }
98
99 pub fn get_mask(store: &DataStore) -> u32 {
101 store.get_mask()
102 }
103
104 pub fn set_mask(store: &DataStore, mask: u32) {
106 store.set_mask(mask);
107 }
108}
109
110async fn serve_one(mut stream: TcpStream, store: DataStore) -> Result<()> {
112 stream.set_nodelay(true)?;
113 let pdu_size = server_handshake(&mut stream).await?;
114 dispatch_loop(&mut stream, pdu_size, store).await
115}
116
117#[cfg(test)]
122mod tests {
123 use std::net::SocketAddr;
124
125 use snap7_client::{types::ConnectParams, S7Client};
126
127 use super::*;
128 use crate::store::DataStore;
129
130 fn make_config() -> ServerConfig {
131 ServerConfig {
132 bind_addr: "127.0.0.1:0".parse::<SocketAddr>().unwrap(),
133 max_connections: 4,
134 }
135 }
136
137 #[tokio::test]
138 async fn server_accepts_s7client_connection() {
139 let store = DataStore::new();
140 store.write_bytes(1, 0, &[0x12, 0x34]);
141
142 let server = S7Server::bind(make_config()).await.unwrap();
143 let addr = server.local_addr().unwrap();
144
145 tokio::spawn(server.serve(store));
146
147 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
148
149 let params = ConnectParams {
150 rack: 0,
151 slot: 1,
152 ..ConnectParams::default()
153 };
154 let client = S7Client::connect(addr, params).await.unwrap();
155 let data = client.db_read(1, 0, 2).await.unwrap();
156 assert_eq!(&data[..], &[0x12, 0x34]);
157 }
158
159 #[test]
160 fn get_status_reflects_cpu_state() {
161 let store = DataStore::new();
162 let status = S7Server::get_status(&store);
163 assert_eq!(status.cpu_state, crate::store::CpuState::Stop);
164 assert_eq!(status.clients_count, 0);
165 assert!(status.server_running);
166 }
167
168 #[test]
169 fn lock_area_blocks_writes() {
170 let store = DataStore::new();
171 store.write_bytes(1, 0, &[0xAA]);
172 S7Server::lock_area(&store, crate::store::area::DATA_BLOCK);
173 store.write_bytes(1, 0, &[0xFF]); let data = store.read_bytes(1, 0, 1);
175 assert_eq!(data, vec![0xAA]);
176 S7Server::unlock_area(&store, crate::store::area::DATA_BLOCK);
177 store.write_bytes(1, 0, &[0xFF]);
178 let data = store.read_bytes(1, 0, 1);
179 assert_eq!(data, vec![0xFF]);
180 }
181
182 #[test]
183 fn event_mask_and_queue() {
184 let store = DataStore::new();
185 S7Server::set_mask(&store, 0xFFFF_FFFF);
186 assert_eq!(S7Server::get_mask(&store), 0xFFFF_FFFF);
187 S7Server::clear_events(&store);
188 assert!(S7Server::pick_event(&store).is_none());
189 }
190
191 #[tokio::test]
192 async fn client_count_increments_on_connect() {
193 let store = DataStore::new();
194 let server = S7Server::bind(make_config()).await.unwrap();
195 let addr = server.local_addr().unwrap();
196 tokio::spawn(server.serve(store.clone()));
197 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
198 let params = snap7_client::types::ConnectParams::default();
199 let _client = snap7_client::S7Client::connect(addr, params).await.unwrap();
200 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
201 assert_eq!(S7Server::get_status(&store).clients_count, 1);
202 }
203
204 #[tokio::test]
205 async fn server_write_then_read() {
206 let store = DataStore::new();
207
208 let server = S7Server::bind(make_config()).await.unwrap();
209 let addr = server.local_addr().unwrap();
210
211 tokio::spawn(server.serve(store));
212
213 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
214
215 let params = ConnectParams::default();
216 let client = S7Client::connect(addr, params).await.unwrap();
217
218 client.db_write(2, 10, &[0xAB, 0xCD]).await.unwrap();
219 let data = client.db_read(2, 10, 2).await.unwrap();
220 assert_eq!(&data[..], &[0xAB, 0xCD]);
221 }
222}