Skip to main content

snap7_server/
server.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use tokio::net::{TcpListener, TcpStream};
4use tokio::sync::Semaphore;
5
6use crate::{
7    dispatch::dispatch_loop, error::Result, handshake::server_handshake, store::DataStore,
8};
9
10/// Configuration for [`S7Server`].
11#[derive(Debug, Clone)]
12pub struct ServerConfig {
13    pub bind_addr: SocketAddr,
14    pub max_connections: usize,
15}
16
17/// TCP listener that accepts connections and runs the full S7 pipeline per connection.
18pub struct S7Server {
19    listener: TcpListener,
20    semaphore: Arc<Semaphore>,
21}
22
23impl S7Server {
24    /// Bind a TCP listener at `config.bind_addr`.
25    pub async fn bind(config: ServerConfig) -> Result<Self> {
26        let listener = TcpListener::bind(config.bind_addr).await?;
27        let semaphore = Arc::new(Semaphore::new(config.max_connections));
28        Ok(Self {
29            listener,
30            semaphore,
31        })
32    }
33
34    /// Return the local address the server is listening on.
35    pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
36        self.listener.local_addr()
37    }
38
39    /// Accept connections and serve them against `store` until an accept error occurs.
40    pub async fn serve(self, store: DataStore) -> Result<()> {
41        loop {
42            // Acquire a permit — blocks when at max_connections.
43            let permit = Arc::clone(&self.semaphore)
44                .acquire_owned()
45                .await
46                .expect("semaphore closed");
47
48            let (stream, _peer) = self.listener.accept().await?;
49            let store = store.clone();
50
51            tokio::spawn(async move {
52                let _permit = permit; // keep permit alive for connection lifetime
53                if let Err(e) = serve_one(stream, store).await {
54                    // Log connection errors at debug level; they are expected
55                    // (e.g., client disconnecting mid-handshake).
56                    let _ = e; // suppress unused-variable warning in release builds
57                }
58            });
59        }
60    }
61}
62
63/// Handle a single accepted connection: set TCP_NODELAY, run handshake, then dispatch loop.
64async fn serve_one(mut stream: TcpStream, store: DataStore) -> Result<()> {
65    stream.set_nodelay(true)?;
66    let pdu_size = server_handshake(&mut stream).await?;
67    dispatch_loop(&mut stream, pdu_size, store).await
68}
69
70// ---------------------------------------------------------------------------
71// Integration tests
72// ---------------------------------------------------------------------------
73
74#[cfg(test)]
75mod tests {
76    use std::net::SocketAddr;
77
78    use snap7_client::{types::ConnectParams, S7Client};
79
80    use super::*;
81    use crate::store::DataStore;
82
83    fn make_config() -> ServerConfig {
84        ServerConfig {
85            bind_addr: "127.0.0.1:0".parse::<SocketAddr>().unwrap(),
86            max_connections: 4,
87        }
88    }
89
90    #[tokio::test]
91    async fn server_accepts_s7client_connection() {
92        let store = DataStore::new();
93        store.write_bytes(1, 0, &[0x12, 0x34]);
94
95        let server = S7Server::bind(make_config()).await.unwrap();
96        let addr = server.local_addr().unwrap();
97
98        tokio::spawn(server.serve(store));
99
100        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
101
102        let params = ConnectParams {
103            rack: 0,
104            slot: 1,
105            ..ConnectParams::default()
106        };
107        let client = S7Client::connect(addr, params).await.unwrap();
108        let data = client.db_read(1, 0, 2).await.unwrap();
109        assert_eq!(&data[..], &[0x12, 0x34]);
110    }
111
112    #[tokio::test]
113    async fn server_write_then_read() {
114        let store = DataStore::new();
115
116        let server = S7Server::bind(make_config()).await.unwrap();
117        let addr = server.local_addr().unwrap();
118
119        tokio::spawn(server.serve(store));
120
121        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
122
123        let params = ConnectParams::default();
124        let client = S7Client::connect(addr, params).await.unwrap();
125
126        client.db_write(2, 10, &[0xAB, 0xCD]).await.unwrap();
127        let data = client.db_read(2, 10, 2).await.unwrap();
128        assert_eq!(&data[..], &[0xAB, 0xCD]);
129    }
130}