Skip to main content

snap7_server/
dispatch.rs

1use bytes::{Bytes, BytesMut};
2use snap7_client::proto::s7::{
3    header::{PduType, S7Header},
4    read_var::{DataItem, ReadVarRequest, ReadVarResponse, FUNC_READ_VAR},
5    write_var::{WriteVarRequest, WriteVarResponse, FUNC_WRITE_VAR},
6};
7use tokio::io::{AsyncRead, AsyncWrite};
8
9use crate::{
10    error::Result,
11    handshake::{recv_cotp_data, send_cotp_data},
12    store::DataStore,
13};
14
15/// Run the S7 request dispatch loop over an established transport.
16///
17/// Reads COTP Data PDUs, decodes S7 requests, executes them against
18/// `store`, and sends S7 AckData responses. Runs until the transport
19/// closes (EOF) or a fatal I/O error occurs.
20pub async fn dispatch_loop<T>(mut transport: T, _pdu_size: u16, store: DataStore) -> Result<()>
21where
22    T: AsyncRead + AsyncWrite + Unpin,
23{
24    loop {
25        let mut payload = match recv_cotp_data(&mut transport).await {
26            Ok(p) => p,
27            Err(_) => return Ok(()), // EOF or transport closed — normal exit
28        };
29
30        let header = match S7Header::decode(&mut payload) {
31            Ok(h) => h,
32            Err(_) => {
33                send_error_response(&mut transport, 0, 0x81, 0x04).await?;
34                continue;
35            }
36        };
37
38        // Peek at the function code (first byte of param section)
39        if payload.is_empty() {
40            send_error_response(&mut transport, header.pdu_ref, 0x81, 0x04).await?;
41            continue;
42        }
43        let func = payload[0];
44
45        match func {
46            FUNC_READ_VAR => match handle_read_var(&mut payload, &store) {
47                Ok((item_count, response)) => {
48                    send_ack_data(
49                        &mut transport,
50                        header.pdu_ref,
51                        FUNC_READ_VAR,
52                        item_count,
53                        response,
54                    )
55                    .await?;
56                }
57                Err(()) => {
58                    send_error_response(&mut transport, header.pdu_ref, 0x81, 0x04).await?;
59                }
60            },
61            FUNC_WRITE_VAR => match handle_write_var(&mut payload, &store) {
62                Ok((item_count, response)) => {
63                    send_ack_data(
64                        &mut transport,
65                        header.pdu_ref,
66                        FUNC_WRITE_VAR,
67                        item_count,
68                        response,
69                    )
70                    .await?;
71                }
72                Err(()) => {
73                    send_error_response(&mut transport, header.pdu_ref, 0x81, 0x04).await?;
74                }
75            },
76            _ => {
77                send_error_response(&mut transport, header.pdu_ref, 0x81, 0x04).await?;
78            }
79        }
80    }
81}
82
83/// Decode a ReadVarRequest, perform reads from `store`, and return
84/// `(item_count, data_bytes)` for the AckData response.
85///
86/// Returns `Err(())` if the request cannot be decoded; the caller must send
87/// an error response rather than a success AckData.
88fn handle_read_var(payload: &mut Bytes, store: &DataStore) -> std::result::Result<(u8, Bytes), ()> {
89    let req = ReadVarRequest::decode(payload).map_err(|_| ())?;
90
91    let items: Vec<DataItem> = req
92        .items
93        .iter()
94        .map(|item| {
95            let data = store.read_bytes(item.db_number, item.start, item.length as u32);
96            DataItem {
97                return_code: 0xFF,
98                data: Bytes::from(data),
99            }
100        })
101        .collect();
102
103    let item_count = items.len() as u8;
104    let resp = ReadVarResponse { items };
105    let mut buf = BytesMut::new();
106    resp.encode(&mut buf);
107    Ok((item_count, buf.freeze()))
108}
109
110/// Decode a WriteVarRequest, perform writes to `store`, and return
111/// `(item_count, data_bytes)` for the AckData response.
112///
113/// Returns `Err(())` if the request cannot be decoded; the caller must send
114/// an error response rather than a success AckData.
115fn handle_write_var(
116    payload: &mut Bytes,
117    store: &DataStore,
118) -> std::result::Result<(u8, Bytes), ()> {
119    let req = WriteVarRequest::decode(payload).map_err(|_| ())?;
120
121    for item in &req.items {
122        store.write_bytes(item.address.db_number, item.address.start, &item.data);
123    }
124
125    let item_count = req.items.len() as u8;
126    let return_codes = vec![0xFF_u8; req.items.len()];
127    let resp = WriteVarResponse { return_codes };
128    // WriteVarResponse encodes as one return_code byte per item
129    let mut buf = BytesMut::new();
130    for &code in &resp.return_codes {
131        buf.extend_from_slice(&[code]);
132    }
133    Ok((item_count, buf.freeze()))
134}
135
136/// Send an S7 AckData response.
137///
138/// The param section contains two bytes: `func` (function echo) and
139/// `item_count` (number of items in the response data), matching what
140/// S7 clients expect (`param_len: 2`).
141async fn send_ack_data<T: AsyncWrite + Unpin>(
142    transport: &mut T,
143    pdu_ref: u16,
144    func: u8,
145    item_count: u8,
146    data: Bytes,
147) -> Result<()> {
148    let param: Bytes = Bytes::copy_from_slice(&[func, item_count]);
149    let header = S7Header {
150        pdu_type: PduType::AckData,
151        reserved: 0,
152        pdu_ref,
153        param_len: 2,
154        data_len: data.len() as u16,
155        error_class: Some(0),
156        error_code: Some(0),
157    };
158    let mut buf = BytesMut::new();
159    header.encode(&mut buf);
160    buf.extend_from_slice(&param);
161    buf.extend_from_slice(&data);
162    send_cotp_data(transport, buf.freeze()).await
163}
164
165/// Send an S7 AckData error response with empty param/data sections.
166async fn send_error_response<T: AsyncWrite + Unpin>(
167    transport: &mut T,
168    pdu_ref: u16,
169    error_class: u8,
170    error_code: u8,
171) -> Result<()> {
172    let header = S7Header {
173        pdu_type: PduType::AckData,
174        reserved: 0,
175        pdu_ref,
176        param_len: 0,
177        data_len: 0,
178        error_class: Some(error_class),
179        error_code: Some(error_code),
180    };
181    let mut buf = BytesMut::new();
182    header.encode(&mut buf);
183    send_cotp_data(transport, buf.freeze()).await
184}
185
186// ---------------------------------------------------------------------------
187// Tests
188// ---------------------------------------------------------------------------
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    use bytes::{Buf, BytesMut};
194    use snap7_client::proto::{
195        cotp::CotpPdu,
196        s7::{
197            header::{Area, PduType, S7Header, TransportSize},
198            read_var::{AddressItem, ReadVarRequest},
199            write_var::{WriteItem, WriteVarRequest},
200        },
201        tpkt::TpktFrame,
202    };
203    use tokio::io::AsyncWriteExt;
204
205    use crate::store::DataStore;
206
207    /// Wrap an S7 payload in COTP Data + TPKT and write it to `writer`.
208    async fn write_s7_frame(writer: &mut (impl tokio::io::AsyncWrite + Unpin), s7_payload: Bytes) {
209        let dt = CotpPdu::Data {
210            tpdu_nr: 0,
211            last: true,
212            payload: s7_payload,
213        };
214        let mut cotp_buf = BytesMut::new();
215        dt.encode(&mut cotp_buf);
216        let tpkt = TpktFrame {
217            payload: cotp_buf.freeze(),
218        };
219        let mut buf = BytesMut::new();
220        tpkt.encode(&mut buf).unwrap();
221        writer.write_all(&buf).await.unwrap();
222    }
223
224    /// Read one TPKT+COTP Data frame from `reader` and return its S7 payload.
225    async fn read_s7_frame(reader: &mut (impl tokio::io::AsyncRead + Unpin)) -> Bytes {
226        use tokio::io::AsyncReadExt;
227        let mut header = [0u8; 4];
228        reader.read_exact(&mut header).await.unwrap();
229        let total = u16::from_be_bytes([header[2], header[3]]) as usize;
230        let mut body = vec![0u8; total - 4];
231        reader.read_exact(&mut body).await.unwrap();
232        let mut b = Bytes::from(body);
233        let pdu = CotpPdu::decode(&mut b).unwrap();
234        match pdu {
235            CotpPdu::Data { payload, .. } => payload,
236            _ => panic!("expected COTP Data PDU"),
237        }
238    }
239
240    fn make_read_request(db: u16, start: u32, length: u16, pdu_ref: u16) -> Bytes {
241        let header = S7Header {
242            pdu_type: PduType::Job,
243            reserved: 0,
244            pdu_ref,
245            param_len: 14, // 2 (func+count) + 12 (one address item)
246            data_len: 0,
247            error_class: None,
248            error_code: None,
249        };
250        let req = ReadVarRequest {
251            items: vec![AddressItem {
252                area: Area::DataBlock,
253                db_number: db,
254                start,
255                bit_offset: 0,
256                length,
257                transport: TransportSize::Byte,
258            }],
259        };
260        let mut buf = BytesMut::new();
261        header.encode(&mut buf);
262        req.encode(&mut buf);
263        buf.freeze()
264    }
265
266    fn make_write_request(db: u16, start: u32, data: &[u8], pdu_ref: u16) -> Bytes {
267        let item = WriteItem {
268            address: AddressItem {
269                area: Area::DataBlock,
270                db_number: db,
271                start,
272                bit_offset: 0,
273                length: data.len() as u16,
274                transport: TransportSize::Byte,
275            },
276            data: Bytes::copy_from_slice(data),
277        };
278        let req = WriteVarRequest { items: vec![item] };
279        let mut param_buf = BytesMut::new();
280        req.encode(&mut param_buf);
281        let param_len = param_buf.len() as u16;
282        let header = S7Header {
283            pdu_type: PduType::Job,
284            reserved: 0,
285            pdu_ref,
286            param_len,
287            data_len: 0,
288            error_class: None,
289            error_code: None,
290        };
291        let mut buf = BytesMut::new();
292        header.encode(&mut buf);
293        buf.extend_from_slice(&param_buf);
294        buf.freeze()
295    }
296
297    #[tokio::test]
298    async fn dispatch_read_var_returns_data() {
299        let store = DataStore::new();
300        store.write_bytes(1, 0, &[0xCA, 0xFE, 0xBA, 0xBE]);
301
302        let (server_io, mut client_io) = tokio::io::duplex(4096);
303
304        let store_clone = store.clone();
305        let server_task =
306            tokio::spawn(async move { dispatch_loop(server_io, 480, store_clone).await });
307
308        // Send ReadVar request
309        let s7_req = make_read_request(1, 0, 4, 1);
310        write_s7_frame(&mut client_io, s7_req).await;
311
312        // Read response
313        let s7_resp = read_s7_frame(&mut client_io).await;
314        let mut resp_bytes = s7_resp;
315        let resp_header = S7Header::decode(&mut resp_bytes).unwrap();
316        assert_eq!(resp_header.pdu_type, PduType::AckData);
317
318        // Skip param section (2 bytes: func code + item count)
319        resp_bytes.advance(2);
320
321        // Parse ReadVarResponse (1 item, 4 bytes)
322        let read_resp = ReadVarResponse::decode(&mut resp_bytes, 1).unwrap();
323        assert_eq!(read_resp.items.len(), 1);
324        assert_eq!(read_resp.items[0].data.as_ref(), &[0xCA, 0xFE, 0xBA, 0xBE]);
325
326        // Close the client end — server will exit loop
327        drop(client_io);
328        let _ = server_task.await;
329    }
330
331    #[tokio::test]
332    async fn dispatch_write_var_stores_data() {
333        let store = DataStore::new();
334
335        let (server_io, mut client_io) = tokio::io::duplex(4096);
336
337        let store_clone = store.clone();
338        let server_task =
339            tokio::spawn(async move { dispatch_loop(server_io, 480, store_clone).await });
340
341        // Send WriteVar request
342        let s7_req = make_write_request(2, 0, &[0x01, 0x02], 2);
343        write_s7_frame(&mut client_io, s7_req).await;
344
345        // Read AckData response
346        let s7_resp = read_s7_frame(&mut client_io).await;
347        let mut resp_bytes = s7_resp;
348        let resp_header = S7Header::decode(&mut resp_bytes).unwrap();
349        assert_eq!(resp_header.pdu_type, PduType::AckData);
350        assert_eq!(resp_header.error_class, Some(0));
351        assert_eq!(resp_header.error_code, Some(0));
352
353        // Close client, wait for server
354        drop(client_io);
355        let _ = server_task.await;
356
357        // Verify store has the written data
358        let stored = store.read_bytes(2, 0, 2);
359        assert_eq!(stored, vec![0x01, 0x02]);
360    }
361}