1use bytes::{Bytes, BytesMut};
2use snap7_client::proto::s7::{
3 header::{PduType, S7Header},
4 read_var::{DataItem, ReadVarRequest, ReadVarResponse, FUNC_READ_VAR},
5 write_var::{WriteVarRequest, WriteVarResponse, FUNC_WRITE_VAR},
6};
7use tokio::io::{AsyncRead, AsyncWrite};
8
9use crate::{
10 error::Result,
11 handshake::{recv_cotp_data, send_cotp_data},
12 store::DataStore,
13};
14
15pub async fn dispatch_loop<T>(mut transport: T, _pdu_size: u16, store: DataStore) -> Result<()>
21where
22 T: AsyncRead + AsyncWrite + Unpin,
23{
24 loop {
25 let mut payload = match recv_cotp_data(&mut transport).await {
26 Ok(p) => p,
27 Err(_) => return Ok(()), };
29
30 let header = match S7Header::decode(&mut payload) {
31 Ok(h) => h,
32 Err(_) => {
33 send_error_response(&mut transport, 0, 0x81, 0x04).await?;
34 continue;
35 }
36 };
37
38 if payload.is_empty() {
40 send_error_response(&mut transport, header.pdu_ref, 0x81, 0x04).await?;
41 continue;
42 }
43 let func = payload[0];
44
45 match func {
46 FUNC_READ_VAR => match handle_read_var(&mut payload, &store) {
47 Ok((item_count, response)) => {
48 send_ack_data(
49 &mut transport,
50 header.pdu_ref,
51 FUNC_READ_VAR,
52 item_count,
53 response,
54 )
55 .await?;
56 }
57 Err(()) => {
58 send_error_response(&mut transport, header.pdu_ref, 0x81, 0x04).await?;
59 }
60 },
61 FUNC_WRITE_VAR => match handle_write_var(&mut payload, &store) {
62 Ok((item_count, response)) => {
63 send_ack_data(
64 &mut transport,
65 header.pdu_ref,
66 FUNC_WRITE_VAR,
67 item_count,
68 response,
69 )
70 .await?;
71 }
72 Err(()) => {
73 send_error_response(&mut transport, header.pdu_ref, 0x81, 0x04).await?;
74 }
75 },
76 _ => {
77 send_error_response(&mut transport, header.pdu_ref, 0x81, 0x04).await?;
78 }
79 }
80 }
81}
82
83fn handle_read_var(payload: &mut Bytes, store: &DataStore) -> std::result::Result<(u8, Bytes), ()> {
89 let req = ReadVarRequest::decode(payload).map_err(|_| ())?;
90
91 let items: Vec<DataItem> = req
92 .items
93 .iter()
94 .map(|item| {
95 let data = store.read_bytes(item.db_number, item.start, item.length as u32);
96 DataItem {
97 return_code: 0xFF,
98 data: Bytes::from(data),
99 }
100 })
101 .collect();
102
103 let item_count = items.len() as u8;
104 let resp = ReadVarResponse { items };
105 let mut buf = BytesMut::new();
106 resp.encode(&mut buf);
107 Ok((item_count, buf.freeze()))
108}
109
110fn handle_write_var(
116 payload: &mut Bytes,
117 store: &DataStore,
118) -> std::result::Result<(u8, Bytes), ()> {
119 let req = WriteVarRequest::decode(payload).map_err(|_| ())?;
120
121 for item in &req.items {
122 store.write_bytes(item.address.db_number, item.address.start, &item.data);
123 }
124
125 let item_count = req.items.len() as u8;
126 let return_codes = vec![0xFF_u8; req.items.len()];
127 let resp = WriteVarResponse { return_codes };
128 let mut buf = BytesMut::new();
130 for &code in &resp.return_codes {
131 buf.extend_from_slice(&[code]);
132 }
133 Ok((item_count, buf.freeze()))
134}
135
136async fn send_ack_data<T: AsyncWrite + Unpin>(
142 transport: &mut T,
143 pdu_ref: u16,
144 func: u8,
145 item_count: u8,
146 data: Bytes,
147) -> Result<()> {
148 let param: Bytes = Bytes::copy_from_slice(&[func, item_count]);
149 let header = S7Header {
150 pdu_type: PduType::AckData,
151 reserved: 0,
152 pdu_ref,
153 param_len: 2,
154 data_len: data.len() as u16,
155 error_class: Some(0),
156 error_code: Some(0),
157 };
158 let mut buf = BytesMut::new();
159 header.encode(&mut buf);
160 buf.extend_from_slice(¶m);
161 buf.extend_from_slice(&data);
162 send_cotp_data(transport, buf.freeze()).await
163}
164
165async fn send_error_response<T: AsyncWrite + Unpin>(
167 transport: &mut T,
168 pdu_ref: u16,
169 error_class: u8,
170 error_code: u8,
171) -> Result<()> {
172 let header = S7Header {
173 pdu_type: PduType::AckData,
174 reserved: 0,
175 pdu_ref,
176 param_len: 0,
177 data_len: 0,
178 error_class: Some(error_class),
179 error_code: Some(error_code),
180 };
181 let mut buf = BytesMut::new();
182 header.encode(&mut buf);
183 send_cotp_data(transport, buf.freeze()).await
184}
185
186#[cfg(test)]
191mod tests {
192 use super::*;
193 use bytes::{Buf, BytesMut};
194 use snap7_client::proto::{
195 cotp::CotpPdu,
196 s7::{
197 header::{Area, PduType, S7Header, TransportSize},
198 read_var::{AddressItem, ReadVarRequest},
199 write_var::{WriteItem, WriteVarRequest},
200 },
201 tpkt::TpktFrame,
202 };
203 use tokio::io::AsyncWriteExt;
204
205 use crate::store::DataStore;
206
207 async fn write_s7_frame(writer: &mut (impl tokio::io::AsyncWrite + Unpin), s7_payload: Bytes) {
209 let dt = CotpPdu::Data {
210 tpdu_nr: 0,
211 last: true,
212 payload: s7_payload,
213 };
214 let mut cotp_buf = BytesMut::new();
215 dt.encode(&mut cotp_buf);
216 let tpkt = TpktFrame {
217 payload: cotp_buf.freeze(),
218 };
219 let mut buf = BytesMut::new();
220 tpkt.encode(&mut buf).unwrap();
221 writer.write_all(&buf).await.unwrap();
222 }
223
224 async fn read_s7_frame(reader: &mut (impl tokio::io::AsyncRead + Unpin)) -> Bytes {
226 use tokio::io::AsyncReadExt;
227 let mut header = [0u8; 4];
228 reader.read_exact(&mut header).await.unwrap();
229 let total = u16::from_be_bytes([header[2], header[3]]) as usize;
230 let mut body = vec![0u8; total - 4];
231 reader.read_exact(&mut body).await.unwrap();
232 let mut b = Bytes::from(body);
233 let pdu = CotpPdu::decode(&mut b).unwrap();
234 match pdu {
235 CotpPdu::Data { payload, .. } => payload,
236 _ => panic!("expected COTP Data PDU"),
237 }
238 }
239
240 fn make_read_request(db: u16, start: u32, length: u16, pdu_ref: u16) -> Bytes {
241 let header = S7Header {
242 pdu_type: PduType::Job,
243 reserved: 0,
244 pdu_ref,
245 param_len: 14, data_len: 0,
247 error_class: None,
248 error_code: None,
249 };
250 let req = ReadVarRequest {
251 items: vec![AddressItem {
252 area: Area::DataBlock,
253 db_number: db,
254 start,
255 bit_offset: 0,
256 length,
257 transport: TransportSize::Byte,
258 }],
259 };
260 let mut buf = BytesMut::new();
261 header.encode(&mut buf);
262 req.encode(&mut buf);
263 buf.freeze()
264 }
265
266 fn make_write_request(db: u16, start: u32, data: &[u8], pdu_ref: u16) -> Bytes {
267 let item = WriteItem {
268 address: AddressItem {
269 area: Area::DataBlock,
270 db_number: db,
271 start,
272 bit_offset: 0,
273 length: data.len() as u16,
274 transport: TransportSize::Byte,
275 },
276 data: Bytes::copy_from_slice(data),
277 };
278 let req = WriteVarRequest { items: vec![item] };
279 let mut param_buf = BytesMut::new();
280 req.encode(&mut param_buf);
281 let param_len = param_buf.len() as u16;
282 let header = S7Header {
283 pdu_type: PduType::Job,
284 reserved: 0,
285 pdu_ref,
286 param_len,
287 data_len: 0,
288 error_class: None,
289 error_code: None,
290 };
291 let mut buf = BytesMut::new();
292 header.encode(&mut buf);
293 buf.extend_from_slice(¶m_buf);
294 buf.freeze()
295 }
296
297 #[tokio::test]
298 async fn dispatch_read_var_returns_data() {
299 let store = DataStore::new();
300 store.write_bytes(1, 0, &[0xCA, 0xFE, 0xBA, 0xBE]);
301
302 let (server_io, mut client_io) = tokio::io::duplex(4096);
303
304 let store_clone = store.clone();
305 let server_task =
306 tokio::spawn(async move { dispatch_loop(server_io, 480, store_clone).await });
307
308 let s7_req = make_read_request(1, 0, 4, 1);
310 write_s7_frame(&mut client_io, s7_req).await;
311
312 let s7_resp = read_s7_frame(&mut client_io).await;
314 let mut resp_bytes = s7_resp;
315 let resp_header = S7Header::decode(&mut resp_bytes).unwrap();
316 assert_eq!(resp_header.pdu_type, PduType::AckData);
317
318 resp_bytes.advance(2);
320
321 let read_resp = ReadVarResponse::decode(&mut resp_bytes, 1).unwrap();
323 assert_eq!(read_resp.items.len(), 1);
324 assert_eq!(read_resp.items[0].data.as_ref(), &[0xCA, 0xFE, 0xBA, 0xBE]);
325
326 drop(client_io);
328 let _ = server_task.await;
329 }
330
331 #[tokio::test]
332 async fn dispatch_write_var_stores_data() {
333 let store = DataStore::new();
334
335 let (server_io, mut client_io) = tokio::io::duplex(4096);
336
337 let store_clone = store.clone();
338 let server_task =
339 tokio::spawn(async move { dispatch_loop(server_io, 480, store_clone).await });
340
341 let s7_req = make_write_request(2, 0, &[0x01, 0x02], 2);
343 write_s7_frame(&mut client_io, s7_req).await;
344
345 let s7_resp = read_s7_frame(&mut client_io).await;
347 let mut resp_bytes = s7_resp;
348 let resp_header = S7Header::decode(&mut resp_bytes).unwrap();
349 assert_eq!(resp_header.pdu_type, PduType::AckData);
350 assert_eq!(resp_header.error_class, Some(0));
351 assert_eq!(resp_header.error_code, Some(0));
352
353 drop(client_io);
355 let _ = server_task.await;
356
357 let stored = store.read_bytes(2, 0, 2);
359 assert_eq!(stored, vec![0x01, 0x02]);
360 }
361}