1use bytes::{Buf, Bytes, BytesMut};
2use std::net::SocketAddr;
3use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4use tokio::sync::Mutex;
5
6use crate::proto::{
7 cotp::CotpPdu,
8 s7::{
9 clock::PlcDateTime,
10 header::{Area, PduType, S7Header, TransportSize},
11 read_var::{AddressItem, ReadVarRequest, ReadVarResponse},
12 szl::{SzlRequest, SzlResponse},
13 write_var::{WriteItem, WriteVarRequest, WriteVarResponse},
14 },
15 tpkt::TpktFrame,
16};
17
18use crate::{
19 connection::{connect, Connection},
20 error::{Error, Result},
21 types::ConnectParams,
22};
23
24#[derive(Debug, Clone)]
26pub struct MultiReadItem {
27 pub area: Area,
28 pub db_number: u16,
29 pub start: u32,
30 pub length: u16,
31 pub transport: TransportSize,
32}
33
34impl MultiReadItem {
35 pub fn db(db: u16, start: u32, length: u16) -> Self {
37 Self {
38 area: Area::DataBlock,
39 db_number: db,
40 start,
41 length,
42 transport: TransportSize::Byte,
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct MultiWriteItem {
50 pub area: Area,
51 pub db_number: u16,
52 pub start: u32,
53 pub data: Bytes,
54}
55
56impl MultiWriteItem {
57 pub fn db(db: u16, start: u32, data: impl Into<Bytes>) -> Self {
59 Self {
60 area: Area::DataBlock,
61 db_number: db,
62 start,
63 data: data.into(),
64 }
65 }
66}
67
68struct Inner<T> {
69 transport: T,
70 connection: Connection,
71 pdu_ref: u16,
72}
73
74pub struct S7Client<T: AsyncRead + AsyncWrite + Unpin + Send> {
75 inner: Mutex<Inner<T>>,
76 #[allow(dead_code)]
77 params: ConnectParams,
78}
79
80impl<T: AsyncRead + AsyncWrite + Unpin + Send> S7Client<T> {
81 pub async fn from_transport(transport: T, params: ConnectParams) -> Result<Self> {
82 let mut t = transport;
83 let connection = connect(&mut t, ¶ms).await?;
84 Ok(S7Client {
85 inner: Mutex::new(Inner {
86 transport: t,
87 connection,
88 pdu_ref: 1,
89 }),
90 params,
91 })
92 }
93
94 fn next_pdu_ref(inner: &mut Inner<T>) -> u16 {
95 inner.pdu_ref = inner.pdu_ref.wrapping_add(1);
96 inner.pdu_ref
97 }
98
99 async fn send_s7(
100 inner: &mut Inner<T>,
101 param_buf: Bytes,
102 data_buf: Bytes,
103 pdu_ref: u16,
104 pdu_type: PduType,
105 ) -> Result<()> {
106 let header = S7Header {
107 pdu_type,
108 reserved: 0,
109 pdu_ref,
110 param_len: param_buf.len() as u16,
111 data_len: data_buf.len() as u16,
112 error_class: None,
113 error_code: None,
114 };
115 let mut s7b = BytesMut::new();
116 header.encode(&mut s7b);
117 s7b.extend_from_slice(¶m_buf);
118 s7b.extend_from_slice(&data_buf);
119
120 let dt = CotpPdu::Data {
121 tpdu_nr: 0,
122 last: true,
123 payload: s7b.freeze(),
124 };
125 let mut cotpb = BytesMut::new();
126 dt.encode(&mut cotpb);
127 let tpkt = TpktFrame {
128 payload: cotpb.freeze(),
129 };
130 let mut tb = BytesMut::new();
131 tpkt.encode(&mut tb)?;
132 inner.transport.write_all(&tb).await?;
133 Ok(())
134 }
135
136 async fn recv_s7(inner: &mut Inner<T>) -> Result<(S7Header, Bytes)> {
137 let mut tpkt_hdr = [0u8; 4];
138 inner.transport.read_exact(&mut tpkt_hdr).await?;
139 let total = u16::from_be_bytes([tpkt_hdr[2], tpkt_hdr[3]]) as usize;
140 if total < 4 {
141 return Err(Error::UnexpectedResponse);
142 }
143 let mut payload = vec![0u8; total - 4];
144 inner.transport.read_exact(&mut payload).await?;
145 let mut b = Bytes::from(payload);
146
147 if b.remaining() < 3 {
149 return Err(Error::UnexpectedResponse);
150 }
151 let _li = b.get_u8();
152 let cotp_code = b.get_u8();
153 if cotp_code != 0xF0 {
154 return Err(Error::UnexpectedResponse);
155 }
156 b.advance(1); let header = S7Header::decode(&mut b)?;
159 Ok((header, b))
160 }
161
162 pub async fn db_read(&self, db: u16, start: u32, length: u16) -> Result<Bytes> {
163 let mut inner = self.inner.lock().await;
164 let pdu_ref = Self::next_pdu_ref(&mut inner);
165
166 let req = ReadVarRequest {
167 items: vec![AddressItem {
168 area: Area::DataBlock,
169 db_number: db,
170 start,
171 bit_offset: 0,
172 length,
173 transport: TransportSize::Byte,
174 }],
175 };
176 let mut param_buf = BytesMut::new();
177 req.encode(&mut param_buf);
178
179 Self::send_s7(
180 &mut inner,
181 param_buf.freeze(),
182 Bytes::new(),
183 pdu_ref,
184 PduType::Job,
185 )
186 .await?;
187
188 let (header, mut body) = Self::recv_s7(&mut inner).await?;
189 check_plc_error(&header, "db_read")?;
190 if body.remaining() >= 2 {
191 body.advance(2); }
193 let resp = ReadVarResponse::decode(&mut body, 1)?;
194 if resp.items.is_empty() {
195 return Err(Error::UnexpectedResponse);
196 }
197 if resp.items[0].return_code != 0xFF {
198 return Err(Error::PlcError {
199 code: resp.items[0].return_code as u32,
200 message: "item error".into(),
201 });
202 }
203 Ok(resp.items[0].data.clone())
204 }
205
206 pub async fn read_multi_vars(&self, items: &[MultiReadItem]) -> Result<Vec<Bytes>> {
214 if items.is_empty() {
215 return Ok(Vec::new());
216 }
217
218 const S7_HEADER: usize = 10;
221 const PARAM_OVERHEAD: usize = 2; const ADDR_ITEM_SIZE: usize = 12;
223 const DATA_ITEM_OVERHEAD: usize = 4;
225 const MAX_ITEMS_PER_PDU: usize = 20;
226
227 let mut inner = self.inner.lock().await;
228 let pdu_size = inner.connection.pdu_size as usize;
229 let max_req_payload = pdu_size.saturating_sub(S7_HEADER + PARAM_OVERHEAD);
230 let max_resp_payload = pdu_size.saturating_sub(S7_HEADER + PARAM_OVERHEAD);
231
232 let mut results = vec![Bytes::new(); items.len()];
233 let mut batch_start = 0;
234
235 while batch_start < items.len() {
236 let mut batch_end = batch_start;
238 let mut req_bytes_used = 0usize;
239 let mut resp_bytes_used = 0usize;
240
241 while batch_end < items.len() && (batch_end - batch_start) < MAX_ITEMS_PER_PDU {
242 let item = &items[batch_end];
243 let item_resp_size =
244 DATA_ITEM_OVERHEAD + item.length as usize + (item.length as usize % 2);
245
246 if batch_end > batch_start
247 && (req_bytes_used + ADDR_ITEM_SIZE > max_req_payload
248 || resp_bytes_used + item_resp_size > max_resp_payload)
249 {
250 break;
251 }
252 req_bytes_used += ADDR_ITEM_SIZE;
253 resp_bytes_used += item_resp_size;
254 batch_end += 1;
255 }
256
257 let batch = &items[batch_start..batch_end];
258 let pdu_ref = Self::next_pdu_ref(&mut inner);
259
260 let req = ReadVarRequest {
261 items: batch
262 .iter()
263 .map(|item| AddressItem {
264 area: item.area,
265 db_number: item.db_number,
266 start: item.start,
267 bit_offset: 0,
268 length: item.length,
271 transport: TransportSize::Byte,
272 })
273 .collect(),
274 };
275 let mut param_buf = BytesMut::new();
276 req.encode(&mut param_buf);
277
278 Self::send_s7(
279 &mut inner,
280 param_buf.freeze(),
281 Bytes::new(),
282 pdu_ref,
283 PduType::Job,
284 )
285 .await?;
286
287 let (header, mut body) = Self::recv_s7(&mut inner).await?;
288 check_plc_error(&header, "read_multi_vars")?;
289 if body.remaining() >= 2 {
290 body.advance(2); }
292 let resp = ReadVarResponse::decode(&mut body, batch.len())?;
293
294 for (i, item) in resp.items.into_iter().enumerate() {
295 if item.return_code != 0xFF {
296 return Err(Error::PlcError {
297 code: item.return_code as u32,
298 message: format!("item {} error", batch_start + i),
299 });
300 }
301 results[batch_start + i] = item.data;
302 }
303
304 batch_start = batch_end;
305 }
306
307 Ok(results)
308 }
309
310 pub async fn write_multi_vars(&self, items: &[MultiWriteItem]) -> Result<()> {
316 if items.is_empty() {
317 return Ok(());
318 }
319
320 const S7_HEADER: usize = 10;
321 const PARAM_OVERHEAD: usize = 2; const ADDR_ITEM_SIZE: usize = 12;
323 const DATA_ITEM_OVERHEAD: usize = 4; const MAX_ITEMS_PER_PDU: usize = 20;
325
326 let mut inner = self.inner.lock().await;
327 let pdu_size = inner.connection.pdu_size as usize;
328 let max_payload = pdu_size.saturating_sub(S7_HEADER + PARAM_OVERHEAD);
329
330 let mut batch_start = 0;
331
332 while batch_start < items.len() {
333 let mut batch_end = batch_start;
334 let mut bytes_used = 0usize;
335
336 while batch_end < items.len() && (batch_end - batch_start) < MAX_ITEMS_PER_PDU {
337 let item = &items[batch_end];
338 let data_len = item.data.len();
339 let item_size = ADDR_ITEM_SIZE + DATA_ITEM_OVERHEAD + data_len + (data_len % 2);
340
341 if batch_end > batch_start && bytes_used + item_size > max_payload {
342 break;
343 }
344 bytes_used += item_size;
345 batch_end += 1;
346 }
347
348 let batch = &items[batch_start..batch_end];
349 let pdu_ref = Self::next_pdu_ref(&mut inner);
350
351 let req = WriteVarRequest {
352 items: batch
353 .iter()
354 .map(|item| WriteItem {
355 address: AddressItem {
356 area: item.area,
357 db_number: item.db_number,
358 start: item.start,
359 bit_offset: 0,
360 length: item.data.len() as u16,
361 transport: TransportSize::Byte,
362 },
363 data: item.data.clone(),
364 })
365 .collect(),
366 };
367 let mut param_buf = BytesMut::new();
368 req.encode(&mut param_buf);
369
370 Self::send_s7(
371 &mut inner,
372 param_buf.freeze(),
373 Bytes::new(),
374 pdu_ref,
375 PduType::Job,
376 )
377 .await?;
378
379 let (header, mut body) = Self::recv_s7(&mut inner).await?;
380 check_plc_error(&header, "write_multi_vars")?;
381 if body.remaining() >= 2 {
382 body.advance(2); }
384 let resp = WriteVarResponse::decode(&mut body, batch.len())?;
385 for (i, &code) in resp.return_codes.iter().enumerate() {
386 if code != 0xFF {
387 return Err(Error::PlcError {
388 code: code as u32,
389 message: format!("item {} write error", batch_start + i),
390 });
391 }
392 }
393
394 batch_start = batch_end;
395 }
396
397 Ok(())
398 }
399
400 pub async fn db_write(&self, db: u16, start: u32, data: &[u8]) -> Result<()> {
401 let mut inner = self.inner.lock().await;
402 let pdu_ref = Self::next_pdu_ref(&mut inner);
403
404 let req = WriteVarRequest {
405 items: vec![WriteItem {
406 address: AddressItem {
407 area: Area::DataBlock,
408 db_number: db,
409 start,
410 bit_offset: 0,
411 length: data.len() as u16,
412 transport: TransportSize::Byte,
413 },
414 data: Bytes::copy_from_slice(data),
415 }],
416 };
417 let mut param_buf = BytesMut::new();
418 req.encode(&mut param_buf);
419
420 Self::send_s7(
421 &mut inner,
422 param_buf.freeze(),
423 Bytes::new(),
424 pdu_ref,
425 PduType::Job,
426 )
427 .await?;
428
429 let (header, mut body) = Self::recv_s7(&mut inner).await?;
430 check_plc_error(&header, "db_write")?;
431 if body.has_remaining() {
432 body.advance(2); }
434 let resp = WriteVarResponse::decode(&mut body, 1)?;
435 if resp.return_codes[0] != 0xFF {
436 return Err(Error::PlcError {
437 code: resp.return_codes[0] as u32,
438 message: "write error".into(),
439 });
440 }
441 Ok(())
442 }
443
444 pub async fn read_szl(&self, szl_id: u16, szl_index: u16) -> Result<SzlResponse> {
445 let mut inner = self.inner.lock().await;
446 let pdu_ref = Self::next_pdu_ref(&mut inner);
447
448 let req = SzlRequest { szl_id, szl_index };
449 let mut param_buf = BytesMut::new();
450 req.encode(&mut param_buf);
451
452 Self::send_s7(
453 &mut inner,
454 param_buf.freeze(),
455 Bytes::new(),
456 pdu_ref,
457 PduType::UserData,
458 )
459 .await?;
460
461 let (_header, mut body) = Self::recv_s7(&mut inner).await?;
462 if body.remaining() > 12 {
463 body.advance(body.remaining() - 12);
464 }
465 Ok(SzlResponse::decode(&mut body)?)
466 }
467
468 pub async fn read_clock(&self) -> Result<PlcDateTime> {
469 let mut inner = self.inner.lock().await;
470 let pdu_ref = Self::next_pdu_ref(&mut inner);
471 let mut param_buf = BytesMut::new();
472 param_buf.extend_from_slice(&[0x00, 0x01, 0x12, 0x04, 0xF5, 0x00]);
473 Self::send_s7(
474 &mut inner,
475 param_buf.freeze(),
476 Bytes::new(),
477 pdu_ref,
478 PduType::UserData,
479 )
480 .await?;
481 let (_header, mut body) = Self::recv_s7(&mut inner).await?;
482 if body.remaining() > 8 {
483 body.advance(body.remaining() - 8);
484 }
485 Ok(PlcDateTime::decode(&mut body)?)
486 }
487}
488
489fn check_plc_error(header: &S7Header, context: &str) -> Result<()> {
490 if let (Some(ec), Some(ecd)) = (header.error_class, header.error_code) {
491 if ec != 0 || ecd != 0 {
492 return Err(Error::PlcError {
493 code: ((ec as u32) << 8) | ecd as u32,
494 message: format!("{} error", context),
495 });
496 }
497 }
498 Ok(())
499}
500
501impl S7Client<crate::transport::TcpTransport> {
502 pub async fn connect(addr: SocketAddr, params: ConnectParams) -> Result<Self> {
503 let transport =
504 crate::transport::TcpTransport::connect(addr, params.connect_timeout).await?;
505 Self::from_transport(transport, params).await
506 }
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512 use bytes::BufMut;
513 use crate::proto::{
514 cotp::CotpPdu,
515 s7::{
516 header::{PduType, S7Header},
517 negotiate::NegotiateResponse,
518 },
519 tpkt::TpktFrame,
520 };
521 use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
522
523 async fn mock_plc_db_read(mut server_io: tokio::io::DuplexStream, response_data: Vec<u8>) {
524 let mut buf = vec![0u8; 4096];
525
526 let _ = server_io.read(&mut buf).await;
528 let cc = CotpPdu::ConnectConfirm {
529 dst_ref: 1,
530 src_ref: 1,
531 };
532 let mut cb = BytesMut::new();
533 cc.encode(&mut cb);
534 let mut tb = BytesMut::new();
535 TpktFrame {
536 payload: cb.freeze(),
537 }
538 .encode(&mut tb)
539 .unwrap();
540 server_io.write_all(&tb).await.unwrap();
541
542 let _ = server_io.read(&mut buf).await;
544 let neg = NegotiateResponse {
545 max_amq_calling: 1,
546 max_amq_called: 1,
547 pdu_length: 480,
548 };
549 let mut s7b = BytesMut::new();
550 S7Header {
551 pdu_type: PduType::AckData,
552 reserved: 0,
553 pdu_ref: 1,
554 param_len: 8,
555 data_len: 0,
556 error_class: Some(0),
557 error_code: Some(0),
558 }
559 .encode(&mut s7b);
560 neg.encode(&mut s7b);
561 let dt = CotpPdu::Data {
562 tpdu_nr: 0,
563 last: true,
564 payload: s7b.freeze(),
565 };
566 let mut cb = BytesMut::new();
567 dt.encode(&mut cb);
568 let mut tb = BytesMut::new();
569 TpktFrame {
570 payload: cb.freeze(),
571 }
572 .encode(&mut tb)
573 .unwrap();
574 server_io.write_all(&tb).await.unwrap();
575
576 let _ = server_io.read(&mut buf).await;
578 let mut s7b = BytesMut::new();
579 S7Header {
580 pdu_type: PduType::AckData,
581 reserved: 0,
582 pdu_ref: 2,
583 param_len: 2,
584 data_len: (4 + response_data.len()) as u16,
585 error_class: Some(0),
586 error_code: Some(0),
587 }
588 .encode(&mut s7b);
589 s7b.extend_from_slice(&[0x04, 0x01]); s7b.put_u8(0xFF); s7b.put_u8(0x04); s7b.put_u16((response_data.len() * 8) as u16);
593 s7b.extend_from_slice(&response_data);
594 let dt = CotpPdu::Data {
595 tpdu_nr: 0,
596 last: true,
597 payload: s7b.freeze(),
598 };
599 let mut cb = BytesMut::new();
600 dt.encode(&mut cb);
601 let mut tb = BytesMut::new();
602 TpktFrame {
603 payload: cb.freeze(),
604 }
605 .encode(&mut tb)
606 .unwrap();
607 server_io.write_all(&tb).await.unwrap();
608 }
609
610 #[tokio::test]
611 async fn db_read_returns_data() {
612 let (client_io, server_io) = duplex(4096);
613 let params = ConnectParams::default();
614 let expected = vec![0xDE, 0xAD, 0xBE, 0xEF];
615 tokio::spawn(mock_plc_db_read(server_io, expected.clone()));
616 let client = S7Client::from_transport(client_io, params).await.unwrap();
617 let data = client.db_read(1, 0, 4).await.unwrap();
618 assert_eq!(&data[..], &expected[..]);
619 }
620
621 async fn mock_plc_multi_read(
623 mut server_io: tokio::io::DuplexStream,
624 items: Vec<Vec<u8>>, ) {
626 let mut buf = vec![0u8; 4096];
627
628 let _ = server_io.read(&mut buf).await;
630 let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
631 let mut cb = BytesMut::new();
632 cc.encode(&mut cb);
633 let mut tb = BytesMut::new();
634 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
635 server_io.write_all(&tb).await.unwrap();
636
637 let _ = server_io.read(&mut buf).await;
639 let neg = NegotiateResponse { max_amq_calling: 1, max_amq_called: 1, pdu_length: 480 };
640 let mut s7b = BytesMut::new();
641 S7Header {
642 pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
643 param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
644 }.encode(&mut s7b);
645 neg.encode(&mut s7b);
646 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
647 let mut cb = BytesMut::new(); dt.encode(&mut cb);
648 let mut tb = BytesMut::new();
649 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
650 server_io.write_all(&tb).await.unwrap();
651
652 let _ = server_io.read(&mut buf).await;
654
655 let item_count = items.len() as u8;
657 let mut data_bytes = BytesMut::new();
658 for item_data in &items {
659 data_bytes.put_u8(0xFF); data_bytes.put_u8(0x04); data_bytes.put_u16((item_data.len() * 8) as u16);
662 data_bytes.extend_from_slice(item_data);
663 if item_data.len() % 2 != 0 {
664 data_bytes.put_u8(0x00); }
666 }
667 let data_len = data_bytes.len() as u16;
668 let mut s7b = BytesMut::new();
669 S7Header {
670 pdu_type: PduType::AckData, reserved: 0, pdu_ref: 2,
671 param_len: 2, data_len, error_class: Some(0), error_code: Some(0),
672 }.encode(&mut s7b);
673 s7b.extend_from_slice(&[0x04, item_count]); s7b.extend_from_slice(&data_bytes);
675
676 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
677 let mut cb = BytesMut::new(); dt.encode(&mut cb);
678 let mut tb = BytesMut::new();
679 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
680 server_io.write_all(&tb).await.unwrap();
681 }
682
683 #[tokio::test]
684 async fn read_multi_vars_returns_all_items() {
685 let (client_io, server_io) = duplex(4096);
686 let params = ConnectParams::default();
687 let item1 = vec![0xDE, 0xAD, 0xBE, 0xEF];
688 let item2 = vec![0x01, 0x02];
689 tokio::spawn(mock_plc_multi_read(server_io, vec![item1.clone(), item2.clone()]));
690 let client = S7Client::from_transport(client_io, params).await.unwrap();
691 let items = [MultiReadItem::db(1, 0, 4), MultiReadItem::db(2, 10, 2)];
692 let results = client.read_multi_vars(&items).await.unwrap();
693 assert_eq!(results.len(), 2);
694 assert_eq!(&results[0][..], &item1[..]);
695 assert_eq!(&results[1][..], &item2[..]);
696 }
697
698 #[tokio::test]
699 async fn read_multi_vars_empty_returns_empty() {
700 let (client_io, server_io) = duplex(4096);
701 let params = ConnectParams::default();
702 tokio::spawn(mock_plc_multi_read(server_io, vec![]));
703 let client = S7Client::from_transport(client_io, params).await.unwrap();
704 let results = client.read_multi_vars(&[]).await.unwrap();
705 assert!(results.is_empty());
706 }
707
708 async fn mock_plc_multi_write(
711 mut server_io: tokio::io::DuplexStream,
712 pdu_size: u16,
713 batches: Vec<usize>,
714 ) {
715 let mut buf = vec![0u8; 65536];
716
717 let _ = server_io.read(&mut buf).await;
719 let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
720 let mut cb = BytesMut::new(); cc.encode(&mut cb);
721 let mut tb = BytesMut::new();
722 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
723 server_io.write_all(&tb).await.unwrap();
724
725 let _ = server_io.read(&mut buf).await;
727 let neg = NegotiateResponse { max_amq_calling: 1, max_amq_called: 1, pdu_length: pdu_size };
728 let mut s7b = BytesMut::new();
729 S7Header {
730 pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
731 param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
732 }.encode(&mut s7b);
733 neg.encode(&mut s7b);
734 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
735 let mut cb = BytesMut::new(); dt.encode(&mut cb);
736 let mut tb = BytesMut::new();
737 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
738 server_io.write_all(&tb).await.unwrap();
739
740 for (i, item_count) in batches.iter().enumerate() {
742 let _ = server_io.read(&mut buf).await;
743 let mut s7b = BytesMut::new();
745 S7Header {
746 pdu_type: PduType::AckData, reserved: 0, pdu_ref: (i + 2) as u16,
747 param_len: 2, data_len: *item_count as u16,
748 error_class: Some(0), error_code: Some(0),
749 }.encode(&mut s7b);
750 s7b.extend_from_slice(&[0x05, *item_count as u8]); for _ in 0..*item_count {
752 s7b.put_u8(0xFF); }
754 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
755 let mut cb = BytesMut::new(); dt.encode(&mut cb);
756 let mut tb = BytesMut::new();
757 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
758 server_io.write_all(&tb).await.unwrap();
759 }
760 }
761
762 #[tokio::test]
763 async fn write_multi_vars_returns_ok() {
764 let (client_io, server_io) = duplex(65536);
765 let params = ConnectParams::default();
766 tokio::spawn(mock_plc_multi_write(server_io, 480, vec![2]));
767 let client = S7Client::from_transport(client_io, params).await.unwrap();
768 let items = [
769 MultiWriteItem::db(1, 0, vec![0xAA, 0xBB, 0xCC, 0xDD]),
770 MultiWriteItem::db(2, 10, vec![0x01, 0x02]),
771 ];
772 client.write_multi_vars(&items).await.unwrap();
773 }
774
775 #[tokio::test]
776 async fn write_multi_vars_empty_returns_ok() {
777 let (client_io, server_io) = duplex(4096);
778 let params = ConnectParams::default();
779 tokio::spawn(mock_plc_multi_write(server_io, 480, vec![]));
781 let client = S7Client::from_transport(client_io, params).await.unwrap();
782 client.write_multi_vars(&[]).await.unwrap();
783 }
784
785 #[tokio::test]
791 async fn write_multi_vars_batches_when_pdu_limit_exceeded() {
792 let (client_io, server_io) = duplex(65536);
793 let params = ConnectParams::default();
794 tokio::spawn(mock_plc_multi_write(server_io, 64, vec![1, 1]));
795 let client = S7Client::from_transport(client_io, params).await.unwrap();
796 let items = [
797 MultiWriteItem::db(1, 0, vec![0x11u8; 20]),
798 MultiWriteItem::db(2, 0, vec![0x22u8; 20]),
799 ];
800 client.write_multi_vars(&items).await.unwrap();
801 }
802
803 #[tokio::test]
809 async fn read_multi_vars_batches_when_pdu_limit_exceeded() {
810 use crate::proto::s7::negotiate::NegotiateResponse;
811
812 async fn mock_split_pdu(mut server_io: tokio::io::DuplexStream) {
813 let mut buf = vec![0u8; 4096];
814
815 let _ = server_io.read(&mut buf).await;
817 let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
818 let mut cb = BytesMut::new(); cc.encode(&mut cb);
819 let mut tb = BytesMut::new();
820 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
821 server_io.write_all(&tb).await.unwrap();
822
823 let _ = server_io.read(&mut buf).await;
825 let neg = NegotiateResponse {
826 max_amq_calling: 1, max_amq_called: 1, pdu_length: 64,
827 };
828 let mut s7b = BytesMut::new();
829 S7Header {
830 pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
831 param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
832 }.encode(&mut s7b);
833 neg.encode(&mut s7b);
834 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
835 let mut cb = BytesMut::new(); dt.encode(&mut cb);
836 let mut tb = BytesMut::new();
837 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
838 server_io.write_all(&tb).await.unwrap();
839
840 let payloads: &[&[u8]] = &[&[0x11u8; 30], &[0x22u8; 30]];
842 for (i, payload) in payloads.iter().enumerate() {
843 let _ = server_io.read(&mut buf).await;
844 let bit_len = (payload.len() * 8) as u16;
845 let mut data_bytes = BytesMut::new();
846 data_bytes.put_u8(0xFF);
847 data_bytes.put_u8(0x04);
848 data_bytes.put_u16(bit_len);
849 data_bytes.extend_from_slice(payload);
850 if payload.len() % 2 != 0 { data_bytes.put_u8(0x00); }
851 let data_len = data_bytes.len() as u16;
852 let mut s7b = BytesMut::new();
853 S7Header {
854 pdu_type: PduType::AckData, reserved: 0, pdu_ref: (i + 2) as u16,
855 param_len: 2, data_len, error_class: Some(0), error_code: Some(0),
856 }.encode(&mut s7b);
857 s7b.extend_from_slice(&[0x04, 0x01]);
858 s7b.extend_from_slice(&data_bytes);
859 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
860 let mut cb = BytesMut::new(); dt.encode(&mut cb);
861 let mut tb = BytesMut::new();
862 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
863 server_io.write_all(&tb).await.unwrap();
864 }
865 }
866
867 let (client_io, server_io) = duplex(4096);
868 let params = ConnectParams::default();
869 tokio::spawn(mock_split_pdu(server_io));
870 let client = S7Client::from_transport(client_io, params).await.unwrap();
871
872 let items = [MultiReadItem::db(1, 0, 30), MultiReadItem::db(2, 0, 30)];
873 let results = client.read_multi_vars(&items).await.unwrap();
874 assert_eq!(results.len(), 2);
875 assert_eq!(&results[0][..], &[0x11u8; 30][..]);
876 assert_eq!(&results[1][..], &[0x22u8; 30][..]);
877 }
878}