1use bytes::{Buf, BufMut, Bytes, BytesMut};
2use std::net::SocketAddr;
3use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4use tokio::sync::Mutex;
5
6use crate::proto::{
7 cotp::CotpPdu,
8 s7::{
9 clock::PlcDateTime,
10 header::{Area, PduType, S7Header, TransportSize},
11 read_var::{AddressItem, ReadVarRequest, ReadVarResponse},
12 szl::{SzlRequest, SzlResponse},
13 write_var::{WriteItem, WriteVarRequest, WriteVarResponse},
14 },
15 tpkt::TpktFrame,
16};
17
18use crate::{
19 connection::{connect, Connection},
20 error::{Error, Result},
21 types::ConnectParams,
22};
23
24#[derive(Debug, Clone)]
26pub struct MultiReadItem {
27 pub area: Area,
28 pub db_number: u16,
29 pub start: u32,
30 pub length: u16,
31 pub transport: TransportSize,
32}
33
34impl MultiReadItem {
35 pub fn db(db: u16, start: u32, length: u16) -> Self {
37 Self {
38 area: Area::DataBlock,
39 db_number: db,
40 start,
41 length,
42 transport: TransportSize::Byte,
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct MultiWriteItem {
50 pub area: Area,
51 pub db_number: u16,
52 pub start: u32,
53 pub data: Bytes,
54}
55
56impl MultiWriteItem {
57 pub fn db(db: u16, start: u32, data: impl Into<Bytes>) -> Self {
59 Self {
60 area: Area::DataBlock,
61 db_number: db,
62 start,
63 data: data.into(),
64 }
65 }
66}
67
68struct Inner<T> {
69 transport: T,
70 connection: Connection,
71 pdu_ref: u16,
72 request_timeout: std::time::Duration,
73}
74
75pub struct S7Client<T: AsyncRead + AsyncWrite + Unpin + Send> {
76 inner: Mutex<Inner<T>>,
77 params: ConnectParams,
78}
79
80impl<T: AsyncRead + AsyncWrite + Unpin + Send> S7Client<T> {
81 pub async fn from_transport(transport: T, params: ConnectParams) -> Result<Self> {
82 let mut t = transport;
83 let connection = connect(&mut t, ¶ms).await?;
84 let timeout = params.request_timeout;
85 Ok(S7Client {
86 inner: Mutex::new(Inner {
87 transport: t,
88 connection,
89 pdu_ref: 1,
90 request_timeout: timeout,
91 }),
92 params,
93 })
94 }
95
96 pub fn request_timeout(&self) -> std::time::Duration {
98 self.params.request_timeout
99 }
100
101 pub async fn set_request_timeout(&self, timeout: std::time::Duration) {
105 let mut inner = self.inner.lock().await;
106 inner.request_timeout = timeout;
107 }
108
109 pub fn get_param(&self, name: &str) -> Result<std::time::Duration> {
113 match name {
114 "request_timeout" => Ok(self.params.request_timeout),
115 "connect_timeout" => Ok(self.params.connect_timeout),
116 "pdu_size" => Err(Error::PlcError {
117 code: 0,
118 message: "pdu_size is not a Duration; use .params.pdu_size directly".into(),
119 }),
120 _ => Err(Error::PlcError {
121 code: 0,
122 message: format!("unknown parameter: {name}"),
123 }),
124 }
125 }
126
127 pub fn set_param(&mut self, name: &str, value: std::time::Duration) -> Result<()> {
131 match name {
132 "request_timeout" => {
133 self.params.request_timeout = value;
134 Ok(())
135 }
136 _ => Err(Error::PlcError {
137 code: 0,
138 message: format!("unknown parameter: {name}"),
139 }),
140 }
141 }
142
143 fn next_pdu_ref(inner: &mut Inner<T>) -> u16 {
144 inner.pdu_ref = inner.pdu_ref.wrapping_add(1);
145 inner.pdu_ref
146 }
147
148 async fn send_s7(
149 inner: &mut Inner<T>,
150 param_buf: Bytes,
151 data_buf: Bytes,
152 pdu_ref: u16,
153 pdu_type: PduType,
154 ) -> Result<()> {
155 let header = S7Header {
156 pdu_type,
157 reserved: 0,
158 pdu_ref,
159 param_len: param_buf.len() as u16,
160 data_len: data_buf.len() as u16,
161 error_class: None,
162 error_code: None,
163 };
164 let mut s7b = BytesMut::new();
165 header.encode(&mut s7b);
166 s7b.extend_from_slice(¶m_buf);
167 s7b.extend_from_slice(&data_buf);
168
169 let dt = CotpPdu::Data {
170 tpdu_nr: 0,
171 last: true,
172 payload: s7b.freeze(),
173 };
174 let mut cotpb = BytesMut::new();
175 dt.encode(&mut cotpb);
176 let tpkt = TpktFrame {
177 payload: cotpb.freeze(),
178 };
179 let mut tb = BytesMut::new();
180 tpkt.encode(&mut tb)?;
181 inner.transport.write_all(&tb).await?;
182 Ok(())
183 }
184
185 async fn recv_s7(inner: &mut Inner<T>) -> Result<(S7Header, Bytes)> {
186 let timeout = inner.request_timeout;
187 let mut tpkt_hdr = [0u8; 4];
188 tokio::time::timeout(timeout, inner.transport.read_exact(&mut tpkt_hdr))
189 .await
190 .map_err(|_| Error::Timeout(timeout))??;
191 let total = u16::from_be_bytes([tpkt_hdr[2], tpkt_hdr[3]]) as usize;
192 if total < 4 {
193 return Err(Error::UnexpectedResponse);
194 }
195 let mut payload = vec![0u8; total - 4];
196 tokio::time::timeout(timeout, inner.transport.read_exact(&mut payload))
197 .await
198 .map_err(|_| Error::Timeout(timeout))??;
199 let mut b = Bytes::from(payload);
200
201 if b.remaining() < 3 {
203 return Err(Error::UnexpectedResponse);
204 }
205 let _li = b.get_u8();
206 let cotp_code = b.get_u8();
207 if cotp_code != 0xF0 {
208 return Err(Error::UnexpectedResponse);
209 }
210 b.advance(1); let header = S7Header::decode(&mut b)?;
213 Ok((header, b))
214 }
215
216 pub async fn db_read(&self, db: u16, start: u32, length: u16) -> Result<Bytes> {
217 let mut inner = self.inner.lock().await;
218 let pdu_ref = Self::next_pdu_ref(&mut inner);
219
220 let req = ReadVarRequest {
221 items: vec![AddressItem {
222 area: Area::DataBlock,
223 db_number: db,
224 start,
225 bit_offset: 0,
226 length,
227 transport: TransportSize::Byte,
228 }],
229 };
230 let mut param_buf = BytesMut::new();
231 req.encode(&mut param_buf);
232
233 Self::send_s7(
234 &mut inner,
235 param_buf.freeze(),
236 Bytes::new(),
237 pdu_ref,
238 PduType::Job,
239 )
240 .await?;
241
242 let (header, mut body) = Self::recv_s7(&mut inner).await?;
243 check_plc_error(&header, "db_read")?;
244 if body.remaining() >= 2 {
245 body.advance(2); }
247 let resp = ReadVarResponse::decode(&mut body, 1)?;
248 if resp.items.is_empty() {
249 return Err(Error::UnexpectedResponse);
250 }
251 if resp.items[0].return_code != 0xFF {
252 return Err(Error::PlcError {
253 code: resp.items[0].return_code as u32,
254 message: "item error".into(),
255 });
256 }
257 Ok(resp.items[0].data.clone())
258 }
259
260 pub async fn read_multi_vars(&self, items: &[MultiReadItem]) -> Result<Vec<Bytes>> {
268 if items.is_empty() {
269 return Ok(Vec::new());
270 }
271
272 const S7_HEADER: usize = 10;
275 const PARAM_OVERHEAD: usize = 2; const ADDR_ITEM_SIZE: usize = 12;
277 const DATA_ITEM_OVERHEAD: usize = 4;
279 const MAX_ITEMS_PER_PDU: usize = 20;
280
281 let mut inner = self.inner.lock().await;
282 let pdu_size = inner.connection.pdu_size as usize;
283 let max_req_payload = pdu_size.saturating_sub(S7_HEADER + PARAM_OVERHEAD);
284 let max_resp_payload = pdu_size.saturating_sub(S7_HEADER + PARAM_OVERHEAD);
285
286 let mut results = vec![Bytes::new(); items.len()];
287 let mut batch_start = 0;
288
289 while batch_start < items.len() {
290 let mut batch_end = batch_start;
292 let mut req_bytes_used = 0usize;
293 let mut resp_bytes_used = 0usize;
294
295 while batch_end < items.len() && (batch_end - batch_start) < MAX_ITEMS_PER_PDU {
296 let item = &items[batch_end];
297 let item_resp_size =
298 DATA_ITEM_OVERHEAD + item.length as usize + (item.length as usize % 2);
299
300 if batch_end > batch_start
301 && (req_bytes_used + ADDR_ITEM_SIZE > max_req_payload
302 || resp_bytes_used + item_resp_size > max_resp_payload)
303 {
304 break;
305 }
306 req_bytes_used += ADDR_ITEM_SIZE;
307 resp_bytes_used += item_resp_size;
308 batch_end += 1;
309 }
310
311 let batch = &items[batch_start..batch_end];
312 let pdu_ref = Self::next_pdu_ref(&mut inner);
313
314 let req = ReadVarRequest {
315 items: batch
316 .iter()
317 .map(|item| AddressItem {
318 area: item.area,
319 db_number: item.db_number,
320 start: item.start,
321 bit_offset: 0,
322 length: item.length,
325 transport: TransportSize::Byte,
326 })
327 .collect(),
328 };
329 let mut param_buf = BytesMut::new();
330 req.encode(&mut param_buf);
331
332 Self::send_s7(
333 &mut inner,
334 param_buf.freeze(),
335 Bytes::new(),
336 pdu_ref,
337 PduType::Job,
338 )
339 .await?;
340
341 let (header, mut body) = Self::recv_s7(&mut inner).await?;
342 check_plc_error(&header, "read_multi_vars")?;
343 if body.remaining() >= 2 {
344 body.advance(2); }
346 let resp = ReadVarResponse::decode(&mut body, batch.len())?;
347
348 for (i, item) in resp.items.into_iter().enumerate() {
349 if item.return_code != 0xFF {
350 return Err(Error::PlcError {
351 code: item.return_code as u32,
352 message: format!("item {} error", batch_start + i),
353 });
354 }
355 results[batch_start + i] = item.data;
356 }
357
358 batch_start = batch_end;
359 }
360
361 Ok(results)
362 }
363
364 pub async fn write_multi_vars(&self, items: &[MultiWriteItem]) -> Result<()> {
370 if items.is_empty() {
371 return Ok(());
372 }
373
374 const S7_HEADER: usize = 10;
375 const PARAM_OVERHEAD: usize = 2; const ADDR_ITEM_SIZE: usize = 12;
377 const DATA_ITEM_OVERHEAD: usize = 4; const MAX_ITEMS_PER_PDU: usize = 20;
379
380 let mut inner = self.inner.lock().await;
381 let pdu_size = inner.connection.pdu_size as usize;
382 let max_payload = pdu_size.saturating_sub(S7_HEADER + PARAM_OVERHEAD);
383
384 let mut batch_start = 0;
385
386 while batch_start < items.len() {
387 let mut batch_end = batch_start;
388 let mut bytes_used = 0usize;
389
390 while batch_end < items.len() && (batch_end - batch_start) < MAX_ITEMS_PER_PDU {
391 let item = &items[batch_end];
392 let data_len = item.data.len();
393 let item_size = ADDR_ITEM_SIZE + DATA_ITEM_OVERHEAD + data_len + (data_len % 2);
394
395 if batch_end > batch_start && bytes_used + item_size > max_payload {
396 break;
397 }
398 bytes_used += item_size;
399 batch_end += 1;
400 }
401
402 let batch = &items[batch_start..batch_end];
403 let pdu_ref = Self::next_pdu_ref(&mut inner);
404
405 let req = WriteVarRequest {
406 items: batch
407 .iter()
408 .map(|item| WriteItem {
409 address: AddressItem {
410 area: item.area,
411 db_number: item.db_number,
412 start: item.start,
413 bit_offset: 0,
414 length: item.data.len() as u16,
415 transport: TransportSize::Byte,
416 },
417 data: item.data.clone(),
418 })
419 .collect(),
420 };
421 let mut param_buf = BytesMut::new();
422 req.encode(&mut param_buf);
423
424 Self::send_s7(
425 &mut inner,
426 param_buf.freeze(),
427 Bytes::new(),
428 pdu_ref,
429 PduType::Job,
430 )
431 .await?;
432
433 let (header, mut body) = Self::recv_s7(&mut inner).await?;
434 check_plc_error(&header, "write_multi_vars")?;
435 if body.remaining() >= 2 {
436 body.advance(2); }
438 let resp = WriteVarResponse::decode(&mut body, batch.len())?;
439 for (i, &code) in resp.return_codes.iter().enumerate() {
440 if code != 0xFF {
441 return Err(Error::PlcError {
442 code: code as u32,
443 message: format!("item {} write error", batch_start + i),
444 });
445 }
446 }
447
448 batch_start = batch_end;
449 }
450
451 Ok(())
452 }
453
454 pub async fn db_write(&self, db: u16, start: u32, data: &[u8]) -> Result<()> {
455 let mut inner = self.inner.lock().await;
456 let pdu_ref = Self::next_pdu_ref(&mut inner);
457
458 let req = WriteVarRequest {
459 items: vec![WriteItem {
460 address: AddressItem {
461 area: Area::DataBlock,
462 db_number: db,
463 start,
464 bit_offset: 0,
465 length: data.len() as u16,
466 transport: TransportSize::Byte,
467 },
468 data: Bytes::copy_from_slice(data),
469 }],
470 };
471 let mut param_buf = BytesMut::new();
472 req.encode(&mut param_buf);
473
474 Self::send_s7(
475 &mut inner,
476 param_buf.freeze(),
477 Bytes::new(),
478 pdu_ref,
479 PduType::Job,
480 )
481 .await?;
482
483 let (header, mut body) = Self::recv_s7(&mut inner).await?;
484 check_plc_error(&header, "db_write")?;
485 if body.has_remaining() {
486 body.advance(2); }
488 let resp = WriteVarResponse::decode(&mut body, 1)?;
489 if resp.return_codes[0] != 0xFF {
490 return Err(Error::PlcError {
491 code: resp.return_codes[0] as u32,
492 message: "write error".into(),
493 });
494 }
495 Ok(())
496 }
497
498 pub async fn ab_read(
503 &self,
504 area: Area,
505 db_number: u16,
506 start: u32,
507 length: u16,
508 ) -> Result<Bytes> {
509 let items = [MultiReadItem {
510 area,
511 db_number,
512 start,
513 length,
514 transport: TransportSize::Byte,
515 }];
516 let mut results = self.read_multi_vars(&items).await?;
517 Ok(results.swap_remove(0))
518 }
519
520 pub async fn ab_write(
525 &self,
526 area: Area,
527 db_number: u16,
528 start: u32,
529 data: &[u8],
530 ) -> Result<()> {
531 let items = [MultiWriteItem {
532 area,
533 db_number,
534 start,
535 data: Bytes::copy_from_slice(data),
536 }];
537 self.write_multi_vars(&items).await
538 }
539
540 pub async fn read_szl(&self, szl_id: u16, szl_index: u16) -> Result<SzlResponse> {
541 let payload = self.read_szl_payload(szl_id, szl_index).await?;
542 let mut b = payload;
543 Ok(SzlResponse::decode(&mut b)?)
544 }
545
546 async fn read_szl_payload(&self, szl_id: u16, szl_index: u16) -> Result<Bytes> {
549 let mut inner = self.inner.lock().await;
550 let pdu_ref = Self::next_pdu_ref(&mut inner);
551
552 let req = SzlRequest { szl_id, szl_index };
553 let mut param_buf = BytesMut::new();
554 req.encode(&mut param_buf);
555
556 Self::send_s7(
557 &mut inner,
558 param_buf.freeze(),
559 Bytes::new(),
560 pdu_ref,
561 PduType::UserData,
562 )
563 .await?;
564
565 let (header, mut body) = Self::recv_s7(&mut inner).await?;
566
567 if body.remaining() < header.param_len as usize {
569 return Err(Error::UnexpectedResponse);
570 }
571 body.advance(header.param_len as usize);
572
573 if body.remaining() < 4 {
576 return Err(Error::UnexpectedResponse);
577 }
578 body.advance(4);
579
580 Ok(body.copy_to_bytes(body.remaining()))
582 }
583
584 pub async fn read_clock(&self) -> Result<PlcDateTime> {
585 let mut inner = self.inner.lock().await;
586 let pdu_ref = Self::next_pdu_ref(&mut inner);
587 let mut param_buf = BytesMut::new();
588 param_buf.extend_from_slice(&[0x00, 0x01, 0x12, 0x04, 0xF5, 0x00]);
589 Self::send_s7(
590 &mut inner,
591 param_buf.freeze(),
592 Bytes::new(),
593 pdu_ref,
594 PduType::UserData,
595 )
596 .await?;
597 let (_header, mut body) = Self::recv_s7(&mut inner).await?;
598 if body.remaining() > 8 {
599 body.advance(body.remaining() - 8);
600 }
601 Ok(PlcDateTime::decode(&mut body)?)
602 }
603
604 pub async fn copy_ram_to_rom(&self) -> Result<()> {
608 let mut inner = self.inner.lock().await;
609 let pdu_ref = Self::next_pdu_ref(&mut inner);
610 let param = Bytes::copy_from_slice(&[
611 0x00, 0x01, 0x12, 0x04, 0x43, 0x44, 0x01, 0x00,
612 ]);
613 Self::send_s7(&mut inner, param, Bytes::new(), pdu_ref, PduType::UserData).await?;
614 let (header, _body) = Self::recv_s7(&mut inner).await?;
615 check_plc_error(&header, "copy_ram_to_rom")?;
616 Ok(())
617 }
618
619 pub async fn compress(&self) -> Result<()> {
624 let mut inner = self.inner.lock().await;
625 let pdu_ref = Self::next_pdu_ref(&mut inner);
626 let param = Bytes::copy_from_slice(&[
627 0x00, 0x01, 0x12, 0x04, 0x42, 0x44, 0x01, 0x00,
628 ]);
629 Self::send_s7(&mut inner, param, Bytes::new(), pdu_ref, PduType::UserData).await?;
630 let (header, _body) = Self::recv_s7(&mut inner).await?;
631 check_plc_error(&header, "compress")?;
632 Ok(())
633 }
634
635 async fn simple_control(inner: &mut Inner<T>, pdu_ref: u16, func: u8) -> Result<()> {
639 let param = Bytes::copy_from_slice(&[func, 0x00]);
640 Self::send_s7(inner, param, Bytes::new(), pdu_ref, PduType::Job).await?;
641 let (header, _body) = Self::recv_s7(inner).await?;
642 check_plc_error(&header, "plc_control")?;
643 Ok(())
644 }
645
646 pub async fn plc_stop(&self) -> Result<()> {
652 let mut inner = self.inner.lock().await;
653 let pdu_ref = Self::next_pdu_ref(&mut inner);
654 Self::simple_control(&mut inner, pdu_ref, 0x29).await
655 }
656
657 pub async fn plc_hot_start(&self) -> Result<()> {
661 let mut inner = self.inner.lock().await;
662 let pdu_ref = Self::next_pdu_ref(&mut inner);
663 Self::simple_control(&mut inner, pdu_ref, 0x28).await
664 }
665
666 pub async fn plc_cold_start(&self) -> Result<()> {
670 let mut inner = self.inner.lock().await;
671 let pdu_ref = Self::next_pdu_ref(&mut inner);
672 Self::simple_control(&mut inner, pdu_ref, 0x2A).await
673 }
674
675 pub async fn get_plc_status(&self) -> Result<crate::types::PlcStatus> {
680 let mut inner = self.inner.lock().await;
681 let pdu_ref = Self::next_pdu_ref(&mut inner);
682 let param = Bytes::copy_from_slice(&[0x31, 0x00]);
683 Self::send_s7(&mut inner, param, Bytes::new(), pdu_ref, PduType::Job).await?;
684 let (header, mut body) = Self::recv_s7(&mut inner).await?;
685 check_plc_error(&header, "get_plc_status")?;
686 if body.remaining() >= 2 {
688 body.advance(2);
689 }
690 if body.remaining() < 1 {
691 return Err(Error::UnexpectedResponse);
692 }
693 let status_byte = body.get_u8();
694 match status_byte {
695 0x00 => Ok(crate::types::PlcStatus::Unknown),
696 0x04 => Ok(crate::types::PlcStatus::Stop),
697 0x08 => Ok(crate::types::PlcStatus::Run),
698 other => Err(Error::PlcError {
699 code: other as u32,
700 message: format!("unknown PLC status byte: 0x{other:02X}"),
701 }),
702 }
703 }
704
705 pub async fn get_order_code(&self) -> Result<crate::types::OrderCode> {
711 let payload = self.read_szl_payload(0x0011, 0x0000).await?;
712 if payload.len() < 8 {
713 return Err(Error::UnexpectedResponse);
714 }
715 let mut b = payload;
716 let _block_len = b.get_u16();
717 let _szl_id = b.get_u16();
718 let _szl_ix = b.get_u16();
719
720 let resp_szl_id = _szl_id;
723 if resp_szl_id != 0x0011 {
724 return Err(Error::PlcError {
725 code: 0,
726 message: format!(
727 "order-code query (SZL 0x0011) not supported by this PLC (returned szl_id=0x{:04X})",
728 resp_szl_id
729 ),
730 });
731 }
732
733 let code_bytes = &b[..b.len().min(20)];
735 let code = String::from_utf8_lossy(code_bytes).trim().to_string();
736 Ok(crate::types::OrderCode { code })
737 }
738
739 pub async fn get_cpu_info(&self) -> Result<crate::types::CpuInfo> {
744 let payload = self.read_szl_payload(0x001C, 0x0000).await?;
745 if payload.len() < 8 {
746 return Err(Error::UnexpectedResponse);
747 }
748 let mut b = payload;
749 let _block_len = b.get_u16();
750 let _szl_id = b.get_u16();
751 let _szl_ix = b.get_u16();
752
753 let resp_szl_id = _szl_id;
756 if resp_szl_id != 0x001C {
757 return Err(Error::PlcError {
758 code: 0,
759 message: format!(
760 "cpu-info query (SZL 0x001C) not supported by this PLC (returned szl_id=0x{:04X})",
761 resp_szl_id
762 ),
763 });
764 }
765
766 let module_type = extract_szl_string(&b, 0, 24);
773 let serial_number = extract_szl_string(&b, 24, 48);
774 let as_name = extract_szl_string(&b, 48, 72);
775 let copyright = extract_szl_string(&b, 72, 98);
776 let module_name = extract_szl_string(&b, 98, 122);
777 Ok(crate::types::CpuInfo {
778 module_type,
779 serial_number,
780 as_name,
781 copyright,
782 module_name,
783 })
784 }
785
786 pub async fn get_cp_info(&self) -> Result<crate::types::CpInfo> {
790 let payload = self.read_szl_payload(0x0131, 0x0000).await?;
791 if payload.len() < 14 {
792 return Err(Error::UnexpectedResponse);
793 }
794 let mut b = payload;
795 let _block_len = b.get_u16();
796 let _szl_id = b.get_u16();
797 let _szl_ix = b.get_u16();
798 skip_szl_entry_header(&mut b);
800 let max_pdu_len = b.get_u32();
802 let max_connections = b.get_u32();
803 let max_mpi_rate = b.get_u32();
804 let max_bus_rate = b.get_u32();
805 Ok(crate::types::CpInfo {
806 max_pdu_len,
807 max_connections,
808 max_mpi_rate,
809 max_bus_rate,
810 })
811 }
812
813 pub async fn read_module_list(&self) -> Result<Vec<crate::types::ModuleEntry>> {
817 let payload = self.read_szl_payload(0x00A0, 0x0000).await?;
818 if payload.len() < 6 {
819 return Err(Error::UnexpectedResponse);
820 }
821 let mut b = payload;
822 let _block_len = b.get_u16();
823 let _szl_id = b.get_u16();
824 let _szl_ix = b.get_u16();
825 skip_szl_entry_header(&mut b);
827 let mut modules = Vec::new();
828 while b.remaining() >= 2 {
829 modules.push(crate::types::ModuleEntry {
830 module_type: b.get_u16(),
831 });
832 }
833 Ok(modules)
834 }
835
836 pub async fn list_blocks(&self) -> Result<crate::types::BlockList> {
842 let payload = self.read_szl_payload(0x0130, 0x0000).await?;
843 if payload.len() < 10 {
844 return Err(Error::UnexpectedResponse);
845 }
846 let mut b = payload;
847 let _block_len = b.get_u16();
848 let _szl_id = b.get_u16();
849 let _szl_ix = b.get_u16();
850 skip_szl_entry_header(&mut b);
852 let total_count = b.get_u32();
853 let mut entries = Vec::new();
854 while b.remaining() >= 4 {
855 entries.push(crate::types::BlockListEntry {
856 block_type: b.get_u16(),
857 count: b.get_u16(),
858 });
859 }
860 Ok(crate::types::BlockList {
861 total_count,
862 entries,
863 })
864 }
865
866 async fn block_info_query(
869 &self,
870 func: u8,
871 block_type: u8,
872 block_number: u16,
873 ) -> Result<Bytes> {
874 let mut inner = self.inner.lock().await;
875 let pdu_ref = Self::next_pdu_ref(&mut inner);
876
877 let mut param_buf = BytesMut::with_capacity(12);
880 param_buf.extend_from_slice(&[
881 0x00, 0x01, 0x12, 0x04, func, 0x44, 0x01, 0x00,
882 block_type, 0x00,
883 ]);
884 param_buf.put_u16(block_number);
885
886 Self::send_s7(
887 &mut inner,
888 param_buf.freeze(),
889 Bytes::new(),
890 pdu_ref,
891 PduType::UserData,
892 )
893 .await?;
894
895 let (header, mut body) = Self::recv_s7(&mut inner).await?;
896
897 if body.remaining() < header.param_len as usize {
899 return Err(Error::UnexpectedResponse);
900 }
901 body.advance(header.param_len as usize);
902
903 if body.remaining() < 4 {
905 return Err(Error::UnexpectedResponse);
906 }
907 body.advance(4);
908
909 Ok(body.copy_to_bytes(body.remaining()))
910 }
911
912 pub async fn get_ag_block_info(
917 &self,
918 block_type: u8,
919 block_number: u16,
920 ) -> Result<crate::types::BlockInfo> {
921 self.get_block_info(0x13, block_type, block_number).await
922 }
923
924 pub async fn get_pg_block_info(
929 &self,
930 block_type: u8,
931 block_number: u16,
932 ) -> Result<crate::types::BlockInfo> {
933 self.get_block_info(0x14, block_type, block_number).await
934 }
935
936 async fn get_block_info(
938 &self,
939 func: u8,
940 block_type: u8,
941 block_number: u16,
942 ) -> Result<crate::types::BlockInfo> {
943 let payload = self
944 .block_info_query(func, block_type, block_number)
945 .await?;
946 if payload.len() < 24 {
948 return Err(Error::UnexpectedResponse);
949 }
950 let mut b = payload;
951
952 let _blk_type_hi = b.get_u16(); let blk_number = b.get_u16();
955 let language = b.get_u16();
956 let flags = b.get_u16();
957 let mc7_size = b.get_u16();
958 let _size_lo = b.get_u16(); let size_ram = b.get_u16();
960 let _size_ro = b.get_u16(); let local_data = b.get_u16();
962 let checksum = b.get_u16();
963 let version = b.get_u16();
964
965 let author = if b.remaining() >= 8 {
967 String::from_utf8_lossy(&b[..8]).trim_end_matches('\0').trim().to_string()
968 } else { String::new() };
969 b.advance(8.min(b.remaining()));
970
971 let family = if b.remaining() >= 8 {
972 String::from_utf8_lossy(&b[..8]).trim_end_matches('\0').trim().to_string()
973 } else { String::new() };
974 b.advance(8.min(b.remaining()));
975
976 let header = if b.remaining() >= 20 {
977 String::from_utf8_lossy(&b[..20]).trim_end_matches('\0').trim().to_string()
978 } else { String::new() };
979 b.advance(20.min(b.remaining()));
980
981 let date = if b.remaining() >= 8 {
982 String::from_utf8_lossy(&b[..8]).trim_end_matches('\0').trim().to_string()
983 } else { String::new() };
984
985 let size = ((_blk_type_hi as u32) << 16) | (b.len() as u32 & 0xFFFF);
987 let size_u16 = size.min(0xFFFF) as u16;
988
989 Ok(crate::types::BlockInfo {
990 block_type: _blk_type_hi,
991 block_number: blk_number,
992 language,
993 flags,
994 size: size_u16,
995 size_ram,
996 mc7_size,
997 local_data,
998 checksum,
999 version,
1000 author,
1001 family,
1002 header,
1003 date,
1004 })
1005 }
1006
1007 pub async fn set_session_password(&self, password: &str) -> Result<()> {
1015 let encrypted = crate::types::encrypt_password(password);
1016 let mut inner = self.inner.lock().await;
1017 let pdu_ref = Self::next_pdu_ref(&mut inner);
1018 let param = Bytes::copy_from_slice(&[0x12, 0x00]);
1019 let data = Bytes::copy_from_slice(&encrypted);
1020 Self::send_s7(&mut inner, param, data, pdu_ref, PduType::Job).await?;
1021 let (header, _body) = Self::recv_s7(&mut inner).await?;
1022 check_plc_error(&header, "set_session_password")?;
1023 Ok(())
1024 }
1025
1026 pub async fn clear_session_password(&self) -> Result<()> {
1028 let mut inner = self.inner.lock().await;
1029 let pdu_ref = Self::next_pdu_ref(&mut inner);
1030 let param = Bytes::copy_from_slice(&[0x11, 0x00]);
1031 Self::send_s7(&mut inner, param, Bytes::new(), pdu_ref, PduType::Job).await?;
1032 let (header, _body) = Self::recv_s7(&mut inner).await?;
1033 check_plc_error(&header, "clear_session_password")?;
1034 Ok(())
1035 }
1036
1037 pub async fn get_protection(&self) -> Result<crate::types::Protection> {
1042 let payload = self.read_szl_payload(0x0032, 0x0004).await?;
1043 if payload.len() < 14 {
1044 return Err(Error::UnexpectedResponse);
1045 }
1046 let mut b = payload;
1047 let _block_len = b.get_u16();
1048 let _szl_id = b.get_u16();
1049 let _szl_ix = b.get_u16();
1050 skip_szl_entry_header(&mut b);
1052 let scheme_szl = b.get_u16();
1053 let scheme_module = b.get_u16();
1054 let scheme_bus = b.get_u16();
1055 let level = b.get_u16();
1056 let pass_wort = if b.remaining() >= 8 {
1058 String::from_utf8_lossy(&b[..8]).trim().to_string()
1059 } else {
1060 String::new()
1061 };
1062 let password_set = pass_wort.eq_ignore_ascii_case("PASSWORD");
1063 Ok(crate::types::Protection {
1064 scheme_szl,
1065 scheme_module,
1066 scheme_bus,
1067 level,
1068 password_set,
1069 })
1070 }
1071
1072 pub async fn delete_block(&self, block_type: u8, block_number: u16) -> Result<()> {
1080 let mut inner = self.inner.lock().await;
1081 let pdu_ref = Self::next_pdu_ref(&mut inner);
1082 let mut param = BytesMut::with_capacity(6);
1084 param.extend_from_slice(&[0x1F, 0x00, block_type, 0x00]);
1085 param.put_u16(block_number);
1086 Self::send_s7(
1087 &mut inner,
1088 param.freeze(),
1089 Bytes::new(),
1090 pdu_ref,
1091 PduType::Job,
1092 )
1093 .await?;
1094 let (header, _body) = Self::recv_s7(&mut inner).await?;
1095 check_plc_error(&header, "delete_block")?;
1096 Ok(())
1097 }
1098
1099 pub async fn upload(&self, block_type: u8, block_number: u16) -> Result<Vec<u8>> {
1104 let mut inner = self.inner.lock().await;
1105 let pdu_ref = Self::next_pdu_ref(&mut inner);
1106
1107 let mut param = BytesMut::with_capacity(6);
1110 param.extend_from_slice(&[0x1D, 0x00, block_type, 0x00]);
1111 param.put_u16(block_number);
1112 Self::send_s7(
1113 &mut inner,
1114 param.freeze(),
1115 Bytes::new(),
1116 pdu_ref,
1117 PduType::Job,
1118 )
1119 .await?;
1120 let (header, mut body) = Self::recv_s7(&mut inner).await?;
1121 check_plc_error(&header, "upload_start")?;
1122 if body.remaining() < 8 {
1124 return Err(Error::UnexpectedResponse);
1125 }
1126 if body.remaining() >= 2 {
1127 body.advance(2); }
1129 let upload_id = body.get_u32();
1130 let _total_len = body.get_u32();
1131
1132 let mut block_data = Vec::new();
1134 loop {
1135 let chunk_pdu_ref = Self::next_pdu_ref(&mut inner);
1136 let mut dparam = BytesMut::with_capacity(6);
1137 dparam.extend_from_slice(&[0x1D, 0x01]);
1138 dparam.put_u32(upload_id);
1139 Self::send_s7(
1140 &mut inner,
1141 dparam.freeze(),
1142 Bytes::new(),
1143 chunk_pdu_ref,
1144 PduType::Job,
1145 )
1146 .await?;
1147 let (dheader, mut dbody) = Self::recv_s7(&mut inner).await?;
1148 check_plc_error(&dheader, "upload_data")?;
1149 if dbody.remaining() >= 2 {
1151 dbody.advance(2);
1152 }
1153 if dbody.is_empty() {
1154 break; }
1156 if block_data.is_empty() && dbody.remaining() >= 4 {
1159 if dbody[0] == 0xFF || dbody[0] == 0x00 {
1161 dbody.advance(4);
1162 }
1163 }
1164 let chunk = dbody.copy_to_bytes(dbody.remaining());
1165 block_data.extend_from_slice(&chunk);
1166
1167 if chunk.len() < inner.connection.pdu_size as usize - 50 {
1169 break;
1170 }
1171 if block_data.len() > 1024 * 1024 * 4 {
1173 return Err(Error::UnexpectedResponse);
1175 }
1176 }
1177
1178 let end_pdu_ref = Self::next_pdu_ref(&mut inner);
1180 let mut eparam = BytesMut::with_capacity(6);
1181 eparam.extend_from_slice(&[0x1D, 0x02]);
1182 eparam.put_u32(upload_id);
1183 Self::send_s7(
1184 &mut inner,
1185 eparam.freeze(),
1186 Bytes::new(),
1187 end_pdu_ref,
1188 PduType::Job,
1189 )
1190 .await?;
1191 let (eheader, _ebody) = Self::recv_s7(&mut inner).await?;
1192 check_plc_error(&eheader, "upload_end")?;
1193
1194 Ok(block_data)
1195 }
1196
1197 pub async fn db_get(&self, db_number: u16) -> Result<Vec<u8>> {
1199 self.upload(0x41, db_number).await }
1201
1202 pub async fn download(&self, block_type: u8, block_number: u16, data: &[u8]) -> Result<()> {
1207 let total_len = data.len() as u32;
1208 let mut inner = self.inner.lock().await;
1209 let pdu_avail = (inner.connection.pdu_size as usize).saturating_sub(50);
1210
1211 let start_ref = Self::next_pdu_ref(&mut inner);
1213 let mut sparam = BytesMut::with_capacity(10);
1215 sparam.extend_from_slice(&[0x1E, 0x00, block_type, 0x00]);
1216 sparam.put_u16(block_number);
1217 sparam.put_u32(total_len);
1218
1219 let chunk_len = pdu_avail.min(data.len());
1221 let first_chunk = Bytes::copy_from_slice(&data[..chunk_len]);
1222 Self::send_s7(
1223 &mut inner,
1224 sparam.freeze(),
1225 first_chunk,
1226 start_ref,
1227 PduType::Job,
1228 )
1229 .await?;
1230
1231 let (sheader, mut sbody) = Self::recv_s7(&mut inner).await?;
1232 check_plc_error(&sheader, "download_start")?;
1233 if sbody.remaining() >= 2 {
1235 sbody.advance(2); }
1237 if sbody.remaining() < 4 {
1238 return Err(Error::UnexpectedResponse);
1239 }
1240 let download_id = sbody.get_u32();
1241
1242 let mut offset = chunk_len;
1243
1244 while offset < data.len() {
1246 let chunk_ref = Self::next_pdu_ref(&mut inner);
1247 let end = (offset + pdu_avail).min(data.len());
1248 let chunk = Bytes::copy_from_slice(&data[offset..end]);
1249
1250 let mut dparam = BytesMut::with_capacity(6);
1251 dparam.extend_from_slice(&[0x1E, 0x01]);
1252 dparam.put_u32(download_id);
1253
1254 Self::send_s7(
1255 &mut inner,
1256 dparam.freeze(),
1257 chunk,
1258 chunk_ref,
1259 PduType::Job,
1260 )
1261 .await?;
1262
1263 let (dheader, _dbody) = Self::recv_s7(&mut inner).await?;
1264 check_plc_error(&dheader, "download_data")?;
1265 offset = end;
1266 }
1267
1268 let end_ref = Self::next_pdu_ref(&mut inner);
1270 let mut eparam = BytesMut::with_capacity(6);
1271 eparam.extend_from_slice(&[0x1E, 0x02]);
1272 eparam.put_u32(download_id);
1273 Self::send_s7(
1274 &mut inner,
1275 eparam.freeze(),
1276 Bytes::new(),
1277 end_ref,
1278 PduType::Job,
1279 )
1280 .await?;
1281 let (eheader, _ebody) = Self::recv_s7(&mut inner).await?;
1282 check_plc_error(&eheader, "download_end")?;
1283
1284 Ok(())
1285 }
1286
1287 pub async fn db_fill(&self, db_number: u16, value: u8) -> Result<()> {
1292 let info = self.get_ag_block_info(0x41, db_number).await?; let size = info.size as usize;
1294 if size == 0 {
1295 return Err(Error::PlcError {
1296 code: 0,
1297 message: format!("DB{db_number} has zero size"),
1298 });
1299 }
1300 let data = vec![value; size];
1301 let chunk_size = 240usize; for offset in (0..size).step_by(chunk_size) {
1304 let end = (offset + chunk_size).min(size);
1305 self.db_write(db_number, offset as u32, &data[offset..end])
1306 .await?;
1307 }
1308 Ok(())
1309 }
1310}
1311
1312fn skip_szl_entry_header(data: &mut Bytes) {
1316 if data.len() >= 2 && data[0] == 0x00 && data[1] > 0 && data[1] <= 200 {
1317 data.advance(2);
1318 }
1319}
1320
1321fn extract_szl_string(data: &[u8], start: usize, end: usize) -> String {
1325 if start >= data.len() {
1326 return String::new();
1327 }
1328 let end = end.min(data.len());
1329 String::from_utf8_lossy(&data[start..end]).trim().to_string()
1330}
1331
1332fn s7_error_description(ec: u8, ecd: u8) -> &'static str {
1334 match (ec, ecd) {
1335 (0x81, 0x04) => "function not supported or access denied by PLC",
1336 (0x81, 0x01) => "reserved by HW or SW function not available",
1337 (0x82, 0x04) => "PLC is in STOP mode, function not possible",
1338 (0x05, 0x01) => "invalid block type number",
1339 (0xD2, 0x01) => "object already exists, download rejected",
1340 (0xD2, 0x02) => "object does not exist, upload failed",
1341 (0xD6, 0x01) => "password protection violation",
1342 (0xD6, 0x05) => "insufficient privilege for this operation",
1343 _ => "unknown error",
1344 }
1345}
1346
1347fn check_plc_error(header: &S7Header, context: &str) -> Result<()> {
1348 if let (Some(ec), Some(ecd)) = (header.error_class, header.error_code) {
1349 if ec != 0 || ecd != 0 {
1350 let detail = s7_error_description(ec, ecd);
1351 return Err(Error::PlcError {
1352 code: ((ec as u32) << 8) | ecd as u32,
1353 message: format!("{}: {} (error_class=0x{ec:02X}, error_code=0x{ecd:02X})", context, detail),
1354 });
1355 }
1356 }
1357 Ok(())
1358}
1359
1360impl S7Client<crate::transport::TcpTransport> {
1361 pub async fn connect(addr: SocketAddr, params: ConnectParams) -> Result<Self> {
1362 let transport =
1363 crate::transport::TcpTransport::connect(addr, params.connect_timeout).await?;
1364 Self::from_transport(transport, params).await
1365 }
1366}
1367
1368impl S7Client<crate::UdpTransport> {
1369 pub async fn connect_udp(addr: SocketAddr, params: ConnectParams) -> Result<Self> {
1371 let transport = crate::UdpTransport::connect(addr)
1372 .await
1373 .map_err(Error::Io)?;
1374 Self::from_transport(transport, params).await
1375 }
1376}
1377
1378#[cfg(test)]
1379mod tests {
1380 use super::*;
1381 use bytes::BufMut;
1382 use crate::proto::{
1383 cotp::CotpPdu,
1384 s7::{
1385 header::{PduType, S7Header},
1386 negotiate::NegotiateResponse,
1387 },
1388 tpkt::TpktFrame,
1389 };
1390 use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
1391
1392 async fn mock_plc_db_read(mut server_io: tokio::io::DuplexStream, response_data: Vec<u8>) {
1393 let mut buf = vec![0u8; 4096];
1394
1395 let _ = server_io.read(&mut buf).await;
1397 let cc = CotpPdu::ConnectConfirm {
1398 dst_ref: 1,
1399 src_ref: 1,
1400 };
1401 let mut cb = BytesMut::new();
1402 cc.encode(&mut cb);
1403 let mut tb = BytesMut::new();
1404 TpktFrame {
1405 payload: cb.freeze(),
1406 }
1407 .encode(&mut tb)
1408 .unwrap();
1409 server_io.write_all(&tb).await.unwrap();
1410
1411 let _ = server_io.read(&mut buf).await;
1413 let neg = NegotiateResponse {
1414 max_amq_calling: 1,
1415 max_amq_called: 1,
1416 pdu_length: 480,
1417 };
1418 let mut s7b = BytesMut::new();
1419 S7Header {
1420 pdu_type: PduType::AckData,
1421 reserved: 0,
1422 pdu_ref: 1,
1423 param_len: 8,
1424 data_len: 0,
1425 error_class: Some(0),
1426 error_code: Some(0),
1427 }
1428 .encode(&mut s7b);
1429 neg.encode(&mut s7b);
1430 let dt = CotpPdu::Data {
1431 tpdu_nr: 0,
1432 last: true,
1433 payload: s7b.freeze(),
1434 };
1435 let mut cb = BytesMut::new();
1436 dt.encode(&mut cb);
1437 let mut tb = BytesMut::new();
1438 TpktFrame {
1439 payload: cb.freeze(),
1440 }
1441 .encode(&mut tb)
1442 .unwrap();
1443 server_io.write_all(&tb).await.unwrap();
1444
1445 let _ = server_io.read(&mut buf).await;
1447 let mut s7b = BytesMut::new();
1448 S7Header {
1449 pdu_type: PduType::AckData,
1450 reserved: 0,
1451 pdu_ref: 2,
1452 param_len: 2,
1453 data_len: (4 + response_data.len()) as u16,
1454 error_class: Some(0),
1455 error_code: Some(0),
1456 }
1457 .encode(&mut s7b);
1458 s7b.extend_from_slice(&[0x04, 0x01]); s7b.put_u8(0xFF); s7b.put_u8(0x04); s7b.put_u16((response_data.len() * 8) as u16);
1462 s7b.extend_from_slice(&response_data);
1463 let dt = CotpPdu::Data {
1464 tpdu_nr: 0,
1465 last: true,
1466 payload: s7b.freeze(),
1467 };
1468 let mut cb = BytesMut::new();
1469 dt.encode(&mut cb);
1470 let mut tb = BytesMut::new();
1471 TpktFrame {
1472 payload: cb.freeze(),
1473 }
1474 .encode(&mut tb)
1475 .unwrap();
1476 server_io.write_all(&tb).await.unwrap();
1477 }
1478
1479 #[tokio::test]
1480 async fn db_read_returns_data() {
1481 let (client_io, server_io) = duplex(4096);
1482 let params = ConnectParams::default();
1483 let expected = vec![0xDE, 0xAD, 0xBE, 0xEF];
1484 tokio::spawn(mock_plc_db_read(server_io, expected.clone()));
1485 let client = S7Client::from_transport(client_io, params).await.unwrap();
1486 let data = client.db_read(1, 0, 4).await.unwrap();
1487 assert_eq!(&data[..], &expected[..]);
1488 }
1489
1490 async fn mock_plc_multi_read(
1492 mut server_io: tokio::io::DuplexStream,
1493 items: Vec<Vec<u8>>, ) {
1495 let mut buf = vec![0u8; 4096];
1496
1497 let _ = server_io.read(&mut buf).await;
1499 let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
1500 let mut cb = BytesMut::new();
1501 cc.encode(&mut cb);
1502 let mut tb = BytesMut::new();
1503 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1504 server_io.write_all(&tb).await.unwrap();
1505
1506 let _ = server_io.read(&mut buf).await;
1508 let neg = NegotiateResponse { max_amq_calling: 1, max_amq_called: 1, pdu_length: 480 };
1509 let mut s7b = BytesMut::new();
1510 S7Header {
1511 pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
1512 param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
1513 }.encode(&mut s7b);
1514 neg.encode(&mut s7b);
1515 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1516 let mut cb = BytesMut::new(); dt.encode(&mut cb);
1517 let mut tb = BytesMut::new();
1518 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1519 server_io.write_all(&tb).await.unwrap();
1520
1521 let _ = server_io.read(&mut buf).await;
1523
1524 let item_count = items.len() as u8;
1526 let mut data_bytes = BytesMut::new();
1527 for item_data in &items {
1528 data_bytes.put_u8(0xFF); data_bytes.put_u8(0x04); data_bytes.put_u16((item_data.len() * 8) as u16);
1531 data_bytes.extend_from_slice(item_data);
1532 if item_data.len() % 2 != 0 {
1533 data_bytes.put_u8(0x00); }
1535 }
1536 let data_len = data_bytes.len() as u16;
1537 let mut s7b = BytesMut::new();
1538 S7Header {
1539 pdu_type: PduType::AckData, reserved: 0, pdu_ref: 2,
1540 param_len: 2, data_len, error_class: Some(0), error_code: Some(0),
1541 }.encode(&mut s7b);
1542 s7b.extend_from_slice(&[0x04, item_count]); s7b.extend_from_slice(&data_bytes);
1544
1545 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1546 let mut cb = BytesMut::new(); dt.encode(&mut cb);
1547 let mut tb = BytesMut::new();
1548 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1549 server_io.write_all(&tb).await.unwrap();
1550 }
1551
1552 #[tokio::test]
1553 async fn read_multi_vars_returns_all_items() {
1554 let (client_io, server_io) = duplex(4096);
1555 let params = ConnectParams::default();
1556 let item1 = vec![0xDE, 0xAD, 0xBE, 0xEF];
1557 let item2 = vec![0x01, 0x02];
1558 tokio::spawn(mock_plc_multi_read(server_io, vec![item1.clone(), item2.clone()]));
1559 let client = S7Client::from_transport(client_io, params).await.unwrap();
1560 let items = [MultiReadItem::db(1, 0, 4), MultiReadItem::db(2, 10, 2)];
1561 let results = client.read_multi_vars(&items).await.unwrap();
1562 assert_eq!(results.len(), 2);
1563 assert_eq!(&results[0][..], &item1[..]);
1564 assert_eq!(&results[1][..], &item2[..]);
1565 }
1566
1567 #[tokio::test]
1568 async fn read_multi_vars_empty_returns_empty() {
1569 let (client_io, server_io) = duplex(4096);
1570 let params = ConnectParams::default();
1571 tokio::spawn(mock_plc_multi_read(server_io, vec![]));
1572 let client = S7Client::from_transport(client_io, params).await.unwrap();
1573 let results = client.read_multi_vars(&[]).await.unwrap();
1574 assert!(results.is_empty());
1575 }
1576
1577 async fn mock_plc_multi_write(
1580 mut server_io: tokio::io::DuplexStream,
1581 pdu_size: u16,
1582 batches: Vec<usize>,
1583 ) {
1584 let mut buf = vec![0u8; 65536];
1585
1586 let _ = server_io.read(&mut buf).await;
1588 let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
1589 let mut cb = BytesMut::new(); cc.encode(&mut cb);
1590 let mut tb = BytesMut::new();
1591 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1592 server_io.write_all(&tb).await.unwrap();
1593
1594 let _ = server_io.read(&mut buf).await;
1596 let neg = NegotiateResponse { max_amq_calling: 1, max_amq_called: 1, pdu_length: pdu_size };
1597 let mut s7b = BytesMut::new();
1598 S7Header {
1599 pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
1600 param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
1601 }.encode(&mut s7b);
1602 neg.encode(&mut s7b);
1603 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1604 let mut cb = BytesMut::new(); dt.encode(&mut cb);
1605 let mut tb = BytesMut::new();
1606 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1607 server_io.write_all(&tb).await.unwrap();
1608
1609 for (i, item_count) in batches.iter().enumerate() {
1611 let _ = server_io.read(&mut buf).await;
1612 let mut s7b = BytesMut::new();
1614 S7Header {
1615 pdu_type: PduType::AckData, reserved: 0, pdu_ref: (i + 2) as u16,
1616 param_len: 2, data_len: *item_count as u16,
1617 error_class: Some(0), error_code: Some(0),
1618 }.encode(&mut s7b);
1619 s7b.extend_from_slice(&[0x05, *item_count as u8]); for _ in 0..*item_count {
1621 s7b.put_u8(0xFF); }
1623 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1624 let mut cb = BytesMut::new(); dt.encode(&mut cb);
1625 let mut tb = BytesMut::new();
1626 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1627 server_io.write_all(&tb).await.unwrap();
1628 }
1629 }
1630
1631 #[tokio::test]
1632 async fn write_multi_vars_returns_ok() {
1633 let (client_io, server_io) = duplex(65536);
1634 let params = ConnectParams::default();
1635 tokio::spawn(mock_plc_multi_write(server_io, 480, vec![2]));
1636 let client = S7Client::from_transport(client_io, params).await.unwrap();
1637 let items = [
1638 MultiWriteItem::db(1, 0, vec![0xAA, 0xBB, 0xCC, 0xDD]),
1639 MultiWriteItem::db(2, 10, vec![0x01, 0x02]),
1640 ];
1641 client.write_multi_vars(&items).await.unwrap();
1642 }
1643
1644 #[tokio::test]
1645 async fn write_multi_vars_empty_returns_ok() {
1646 let (client_io, server_io) = duplex(4096);
1647 let params = ConnectParams::default();
1648 tokio::spawn(mock_plc_multi_write(server_io, 480, vec![]));
1650 let client = S7Client::from_transport(client_io, params).await.unwrap();
1651 client.write_multi_vars(&[]).await.unwrap();
1652 }
1653
1654 #[tokio::test]
1660 async fn write_multi_vars_batches_when_pdu_limit_exceeded() {
1661 let (client_io, server_io) = duplex(65536);
1662 let params = ConnectParams::default();
1663 tokio::spawn(mock_plc_multi_write(server_io, 64, vec![1, 1]));
1664 let client = S7Client::from_transport(client_io, params).await.unwrap();
1665 let items = [
1666 MultiWriteItem::db(1, 0, vec![0x11u8; 20]),
1667 MultiWriteItem::db(2, 0, vec![0x22u8; 20]),
1668 ];
1669 client.write_multi_vars(&items).await.unwrap();
1670 }
1671
1672 #[tokio::test]
1678 async fn read_multi_vars_batches_when_pdu_limit_exceeded() {
1679 use crate::proto::s7::negotiate::NegotiateResponse;
1680
1681 async fn mock_split_pdu(mut server_io: tokio::io::DuplexStream) {
1682 let mut buf = vec![0u8; 4096];
1683
1684 let _ = server_io.read(&mut buf).await;
1686 let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
1687 let mut cb = BytesMut::new(); cc.encode(&mut cb);
1688 let mut tb = BytesMut::new();
1689 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1690 server_io.write_all(&tb).await.unwrap();
1691
1692 let _ = server_io.read(&mut buf).await;
1694 let neg = NegotiateResponse {
1695 max_amq_calling: 1, max_amq_called: 1, pdu_length: 64,
1696 };
1697 let mut s7b = BytesMut::new();
1698 S7Header {
1699 pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
1700 param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
1701 }.encode(&mut s7b);
1702 neg.encode(&mut s7b);
1703 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1704 let mut cb = BytesMut::new(); dt.encode(&mut cb);
1705 let mut tb = BytesMut::new();
1706 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1707 server_io.write_all(&tb).await.unwrap();
1708
1709 let payloads: &[&[u8]] = &[&[0x11u8; 30], &[0x22u8; 30]];
1711 for (i, payload) in payloads.iter().enumerate() {
1712 let _ = server_io.read(&mut buf).await;
1713 let bit_len = (payload.len() * 8) as u16;
1714 let mut data_bytes = BytesMut::new();
1715 data_bytes.put_u8(0xFF);
1716 data_bytes.put_u8(0x04);
1717 data_bytes.put_u16(bit_len);
1718 data_bytes.extend_from_slice(payload);
1719 if payload.len() % 2 != 0 { data_bytes.put_u8(0x00); }
1720 let data_len = data_bytes.len() as u16;
1721 let mut s7b = BytesMut::new();
1722 S7Header {
1723 pdu_type: PduType::AckData, reserved: 0, pdu_ref: (i + 2) as u16,
1724 param_len: 2, data_len, error_class: Some(0), error_code: Some(0),
1725 }.encode(&mut s7b);
1726 s7b.extend_from_slice(&[0x04, 0x01]);
1727 s7b.extend_from_slice(&data_bytes);
1728 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1729 let mut cb = BytesMut::new(); dt.encode(&mut cb);
1730 let mut tb = BytesMut::new();
1731 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1732 server_io.write_all(&tb).await.unwrap();
1733 }
1734 }
1735
1736 let (client_io, server_io) = duplex(4096);
1737 let params = ConnectParams::default();
1738 tokio::spawn(mock_split_pdu(server_io));
1739 let client = S7Client::from_transport(client_io, params).await.unwrap();
1740
1741 let items = [MultiReadItem::db(1, 0, 30), MultiReadItem::db(2, 0, 30)];
1742 let results = client.read_multi_vars(&items).await.unwrap();
1743 assert_eq!(results.len(), 2);
1744 assert_eq!(&results[0][..], &[0x11u8; 30][..]);
1745 assert_eq!(&results[1][..], &[0x22u8; 30][..]);
1746 }
1747
1748 async fn mock_handshake(server_io: &mut (impl AsyncRead + AsyncWrite + Unpin)) {
1752 let mut buf = vec![0u8; 4096];
1753
1754 let _ = server_io.read(&mut buf).await;
1756 let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
1757 let mut cb = BytesMut::new(); cc.encode(&mut cb);
1758 let mut tb = BytesMut::new();
1759 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1760 server_io.write_all(&tb).await.unwrap();
1761
1762 let _ = server_io.read(&mut buf).await;
1764 let neg = NegotiateResponse { max_amq_calling: 1, max_amq_called: 1, pdu_length: 480 };
1765 let mut s7b = BytesMut::new();
1766 S7Header {
1767 pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
1768 param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
1769 }.encode(&mut s7b);
1770 neg.encode(&mut s7b);
1771 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1772 let mut cb = BytesMut::new(); dt.encode(&mut cb);
1773 let mut tb = BytesMut::new();
1774 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1775 server_io.write_all(&tb).await.unwrap();
1776 }
1777
1778 async fn mock_plc_control(
1781 mut server_io: tokio::io::DuplexStream,
1782 ok: bool,
1783 ) {
1784 let mut buf = vec![0u8; 4096];
1785 mock_handshake(&mut server_io).await;
1786
1787 let _ = server_io.read(&mut buf).await;
1789
1790 let (ec, ecd) = if ok { (0u8, 0u8) } else { (0x81u8, 0x04u8) };
1792 let mut s7b = BytesMut::new();
1793 S7Header {
1794 pdu_type: PduType::AckData, reserved: 0, pdu_ref: 2,
1795 param_len: 0, data_len: 0,
1796 error_class: Some(ec), error_code: Some(ecd),
1797 }.encode(&mut s7b);
1798 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1799 let mut cb = BytesMut::new(); dt.encode(&mut cb);
1800 let mut tb = BytesMut::new();
1801 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1802 server_io.write_all(&tb).await.unwrap();
1803 }
1804
1805 #[tokio::test]
1806 async fn plc_stop_succeeds() {
1807 let (client_io, server_io) = duplex(4096);
1808 let params = ConnectParams::default();
1809 tokio::spawn(mock_plc_control(server_io, true));
1810 let client = S7Client::from_transport(client_io, params).await.unwrap();
1811 client.plc_stop().await.unwrap();
1812 }
1813
1814 #[tokio::test]
1815 async fn plc_hot_start_succeeds() {
1816 let (client_io, server_io) = duplex(4096);
1817 let params = ConnectParams::default();
1818 tokio::spawn(mock_plc_control(server_io, true));
1819 let client = S7Client::from_transport(client_io, params).await.unwrap();
1820 client.plc_hot_start().await.unwrap();
1821 }
1822
1823 #[tokio::test]
1824 async fn plc_cold_start_succeeds() {
1825 let (client_io, server_io) = duplex(4096);
1826 let params = ConnectParams::default();
1827 tokio::spawn(mock_plc_control(server_io, true));
1828 let client = S7Client::from_transport(client_io, params).await.unwrap();
1829 client.plc_cold_start().await.unwrap();
1830 }
1831
1832 #[tokio::test]
1833 async fn plc_stop_rejected_returns_error() {
1834 let (client_io, server_io) = duplex(4096);
1835 let params = ConnectParams::default();
1836 tokio::spawn(mock_plc_control(server_io, false));
1837 let client = S7Client::from_transport(client_io, params).await.unwrap();
1838 let result = client.plc_stop().await;
1839 assert!(result.is_err());
1840 }
1841
1842 async fn mock_plc_status(
1844 mut server_io: tokio::io::DuplexStream,
1845 status_byte: u8,
1846 ) {
1847 let mut buf = vec![0u8; 4096];
1848 mock_handshake(&mut server_io).await;
1849
1850 let _ = server_io.read(&mut buf).await;
1852
1853 let data = &[0x31u8, 0x00, status_byte]; let data_len = data.len() as u16;
1856 let mut s7b = BytesMut::new();
1857 S7Header {
1858 pdu_type: PduType::AckData, reserved: 0, pdu_ref: 2,
1859 param_len: 2, data_len,
1860 error_class: Some(0), error_code: Some(0),
1861 }.encode(&mut s7b);
1862 s7b.extend_from_slice(data);
1863 let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1864 let mut cb = BytesMut::new(); dt.encode(&mut cb);
1865 let mut tb = BytesMut::new();
1866 TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1867 server_io.write_all(&tb).await.unwrap();
1868 }
1869
1870 #[tokio::test]
1871 async fn get_plc_status_returns_run() {
1872 let (client_io, server_io) = duplex(4096);
1873 let params = ConnectParams::default();
1874 tokio::spawn(mock_plc_status(server_io, 0x08));
1875 let client = S7Client::from_transport(client_io, params).await.unwrap();
1876 let status = client.get_plc_status().await.unwrap();
1877 assert_eq!(status, crate::types::PlcStatus::Run);
1878 }
1879
1880 #[tokio::test]
1881 async fn get_plc_status_returns_stop() {
1882 let (client_io, server_io) = duplex(4096);
1883 let params = ConnectParams::default();
1884 tokio::spawn(mock_plc_status(server_io, 0x04));
1885 let client = S7Client::from_transport(client_io, params).await.unwrap();
1886 let status = client.get_plc_status().await.unwrap();
1887 assert_eq!(status, crate::types::PlcStatus::Stop);
1888 }
1889
1890 #[tokio::test]
1891 async fn get_plc_status_returns_unknown() {
1892 let (client_io, server_io) = duplex(4096);
1893 let params = ConnectParams::default();
1894 tokio::spawn(mock_plc_status(server_io, 0x00));
1895 let client = S7Client::from_transport(client_io, params).await.unwrap();
1896 let status = client.get_plc_status().await.unwrap();
1897 assert_eq!(status, crate::types::PlcStatus::Unknown);
1898 }
1899
1900 #[tokio::test]
1901 async fn get_plc_status_unknown_byte_returns_error() {
1902 let (client_io, server_io) = duplex(4096);
1903 let params = ConnectParams::default();
1904 tokio::spawn(mock_plc_status(server_io, 0xFF));
1905 let client = S7Client::from_transport(client_io, params).await.unwrap();
1906 let result = client.get_plc_status().await;
1907 assert!(result.is_err());
1908 }
1909}