Skip to main content

snap7_server/
server.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use tokio::net::{TcpListener, TcpStream};
4use tokio::sync::Semaphore;
5
6use crate::{
7    dispatch::dispatch_loop,
8    error::Result,
9    handshake::server_handshake,
10    store::{CpuState, DataStore, EventInfo, ServerStatus},
11};
12
13/// Configuration for [`S7Server`].
14#[derive(Debug, Clone)]
15pub struct ServerConfig {
16    pub bind_addr: SocketAddr,
17    pub max_connections: usize,
18}
19
20/// TCP listener that accepts connections and runs the full S7 pipeline per connection.
21pub struct S7Server {
22    listener: TcpListener,
23    semaphore: Arc<Semaphore>,
24}
25
26impl S7Server {
27    /// Bind a TCP listener at `config.bind_addr`.
28    pub async fn bind(config: ServerConfig) -> Result<Self> {
29        let listener = TcpListener::bind(config.bind_addr).await?;
30        let semaphore = Arc::new(Semaphore::new(config.max_connections));
31        Ok(Self { listener, semaphore })
32    }
33
34    /// Bind to a specific address string (e.g. `"0.0.0.0:102"`).
35    pub async fn start_to(addr: &str, max_connections: usize) -> Result<Self> {
36        let bind_addr: SocketAddr = addr.parse().map_err(|e| {
37            crate::error::Error::Io(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
38        })?;
39        Self::bind(ServerConfig { bind_addr, max_connections }).await
40    }
41
42    /// Return the local address the server is listening on.
43    pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
44        self.listener.local_addr()
45    }
46
47    /// Accept connections and serve them against `store` until an accept error occurs.
48    pub async fn serve(self, store: DataStore) -> Result<()> {
49        loop {
50            let permit = Arc::clone(&self.semaphore)
51                .acquire_owned()
52                .await
53                .expect("semaphore closed");
54
55            let (stream, _peer) = self.listener.accept().await?;
56            let store = store.clone();
57
58            tokio::spawn(async move {
59                let _permit = permit;
60                store.client_connected();
61                let _ = serve_one(stream, store.clone()).await;
62                store.client_disconnected();
63            });
64        }
65    }
66
67    // -- Delegated store management API (mirrors C snap7 Srv_* functions) ----
68
69    /// Return server/CPU status and connected client count.
70    pub fn get_status(store: &DataStore) -> ServerStatus {
71        store.get_status()
72    }
73
74    /// Set the simulated CPU state.
75    pub fn set_cpu_status(store: &DataStore, state: CpuState) {
76        store.set_cpu_state(state);
77    }
78
79    /// Lock an area: writes to this area are silently ignored until unlocked.
80    pub fn lock_area(store: &DataStore, area_code: u8) {
81        store.lock_area(area_code);
82    }
83
84    /// Unlock a previously locked area.
85    pub fn unlock_area(store: &DataStore, area_code: u8) {
86        store.unlock_area(area_code);
87    }
88
89    /// Drain the event queue.
90    pub fn clear_events(store: &DataStore) {
91        store.clear_events();
92    }
93
94    /// Pop the oldest event from the queue. Returns `None` when empty.
95    pub fn pick_event(store: &DataStore) -> Option<EventInfo> {
96        store.pick_event()
97    }
98
99    /// Get the current event filter mask.
100    pub fn get_mask(store: &DataStore) -> u32 {
101        store.get_mask()
102    }
103
104    /// Set the event filter mask.
105    pub fn set_mask(store: &DataStore, mask: u32) {
106        store.set_mask(mask);
107    }
108}
109
110/// Handle a single accepted connection: set TCP_NODELAY, run handshake, then dispatch loop.
111async fn serve_one(mut stream: TcpStream, store: DataStore) -> Result<()> {
112    stream.set_nodelay(true)?;
113    let pdu_size = server_handshake(&mut stream).await?;
114    dispatch_loop(&mut stream, pdu_size, store).await
115}
116
117// ---------------------------------------------------------------------------
118// Integration tests
119// ---------------------------------------------------------------------------
120
121#[cfg(test)]
122mod tests {
123    use std::net::SocketAddr;
124
125    use snap7_client::{types::ConnectParams, S7Client};
126
127    use super::*;
128    use crate::store::DataStore;
129
130    fn make_config() -> ServerConfig {
131        ServerConfig {
132            bind_addr: "127.0.0.1:0".parse::<SocketAddr>().unwrap(),
133            max_connections: 4,
134        }
135    }
136
137    #[tokio::test]
138    async fn server_accepts_s7client_connection() {
139        let store = DataStore::new();
140        store.write_bytes(1, 0, &[0x12, 0x34]);
141
142        let server = S7Server::bind(make_config()).await.unwrap();
143        let addr = server.local_addr().unwrap();
144
145        tokio::spawn(server.serve(store));
146
147        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
148
149        let params = ConnectParams {
150            rack: 0,
151            slot: 1,
152            ..ConnectParams::default()
153        };
154        let client = S7Client::connect(addr, params).await.unwrap();
155        let data = client.db_read(1, 0, 2).await.unwrap();
156        assert_eq!(&data[..], &[0x12, 0x34]);
157    }
158
159    #[test]
160    fn get_status_reflects_cpu_state() {
161        let store = DataStore::new();
162        let status = S7Server::get_status(&store);
163        assert_eq!(status.cpu_state, crate::store::CpuState::Stop);
164        assert_eq!(status.clients_count, 0);
165        assert!(status.server_running);
166    }
167
168    #[test]
169    fn lock_area_blocks_writes() {
170        let store = DataStore::new();
171        store.write_bytes(1, 0, &[0xAA]);
172        S7Server::lock_area(&store, crate::store::area::DATA_BLOCK);
173        store.write_bytes(1, 0, &[0xFF]); // should be silently ignored
174        let data = store.read_bytes(1, 0, 1);
175        assert_eq!(data, vec![0xAA]);
176        S7Server::unlock_area(&store, crate::store::area::DATA_BLOCK);
177        store.write_bytes(1, 0, &[0xFF]);
178        let data = store.read_bytes(1, 0, 1);
179        assert_eq!(data, vec![0xFF]);
180    }
181
182    #[test]
183    fn event_mask_and_queue() {
184        let store = DataStore::new();
185        S7Server::set_mask(&store, 0xFFFF_FFFF);
186        assert_eq!(S7Server::get_mask(&store), 0xFFFF_FFFF);
187        S7Server::clear_events(&store);
188        assert!(S7Server::pick_event(&store).is_none());
189    }
190
191    #[tokio::test]
192    async fn client_count_increments_on_connect() {
193        let store = DataStore::new();
194        let server = S7Server::bind(make_config()).await.unwrap();
195        let addr = server.local_addr().unwrap();
196        tokio::spawn(server.serve(store.clone()));
197        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
198        let params = snap7_client::types::ConnectParams::default();
199        let _client = snap7_client::S7Client::connect(addr, params).await.unwrap();
200        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
201        assert_eq!(S7Server::get_status(&store).clients_count, 1);
202    }
203
204    #[tokio::test]
205    async fn server_write_then_read() {
206        let store = DataStore::new();
207
208        let server = S7Server::bind(make_config()).await.unwrap();
209        let addr = server.local_addr().unwrap();
210
211        tokio::spawn(server.serve(store));
212
213        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
214
215        let params = ConnectParams::default();
216        let client = S7Client::connect(addr, params).await.unwrap();
217
218        client.db_write(2, 10, &[0xAB, 0xCD]).await.unwrap();
219        let data = client.db_read(2, 10, 2).await.unwrap();
220        assert_eq!(&data[..], &[0xAB, 0xCD]);
221    }
222}