Skip to main content

snap7_client/
client.rs

1use bytes::{Buf, BufMut, Bytes, BytesMut};
2use std::net::SocketAddr;
3use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4use tokio::sync::Mutex;
5
6use crate::proto::{
7    cotp::CotpPdu,
8    s7::{
9        clock::PlcDateTime,
10        header::{Area, PduType, S7Header, TransportSize},
11        read_var::{AddressItem, ReadVarRequest, ReadVarResponse},
12        szl::{SzlRequest, SzlResponse},
13        write_var::{WriteItem, WriteVarRequest, WriteVarResponse},
14    },
15    tpkt::TpktFrame,
16};
17
18use crate::{
19    connection::{connect, Connection},
20    error::{Error, Result},
21    types::ConnectParams,
22};
23
24/// A single item in a `read_multi_vars` request.
25#[derive(Debug, Clone)]
26pub struct MultiReadItem {
27    pub area: Area,
28    pub db_number: u16,
29    pub start: u32,
30    pub length: u16,
31    pub transport: TransportSize,
32}
33
34impl MultiReadItem {
35    /// Convenience constructor for a DataBlock byte read.
36    pub fn db(db: u16, start: u32, length: u16) -> Self {
37        Self {
38            area: Area::DataBlock,
39            db_number: db,
40            start,
41            length,
42            transport: TransportSize::Byte,
43        }
44    }
45}
46
47/// A single item in a `write_multi_vars` request.
48#[derive(Debug, Clone)]
49pub struct MultiWriteItem {
50    pub area: Area,
51    pub db_number: u16,
52    pub start: u32,
53    pub data: Bytes,
54}
55
56impl MultiWriteItem {
57    /// Convenience constructor for a DataBlock byte write.
58    pub fn db(db: u16, start: u32, data: impl Into<Bytes>) -> Self {
59        Self {
60            area: Area::DataBlock,
61            db_number: db,
62            start,
63            data: data.into(),
64        }
65    }
66}
67
68struct Inner<T> {
69    transport: T,
70    connection: Connection,
71    pdu_ref: u16,
72    request_timeout: std::time::Duration,
73}
74
75pub struct S7Client<T: AsyncRead + AsyncWrite + Unpin + Send> {
76    inner: Mutex<Inner<T>>,
77    params: ConnectParams,
78}
79
80impl<T: AsyncRead + AsyncWrite + Unpin + Send> S7Client<T> {
81    pub async fn from_transport(transport: T, params: ConnectParams) -> Result<Self> {
82        let mut t = transport;
83        let connection = connect(&mut t, &params).await?;
84        let timeout = params.request_timeout;
85        Ok(S7Client {
86            inner: Mutex::new(Inner {
87                transport: t,
88                connection,
89                pdu_ref: 1,
90                request_timeout: timeout,
91            }),
92            params,
93        })
94    }
95
96    /// Return the current request timeout.
97    pub fn request_timeout(&self) -> std::time::Duration {
98        self.params.request_timeout
99    }
100
101    /// Update the request timeout at runtime.
102    ///
103    /// This affects subsequent `recv_s7` calls made by this client instance.
104    pub async fn set_request_timeout(&self, timeout: std::time::Duration) {
105        let mut inner = self.inner.lock().await;
106        inner.request_timeout = timeout;
107    }
108
109    /// Read a client parameter by name.
110    ///
111    /// Supported names: `"request_timeout"`, `"connect_timeout"`, `"pdu_size"`.
112    pub fn get_param(&self, name: &str) -> Result<std::time::Duration> {
113        match name {
114            "request_timeout" => Ok(self.params.request_timeout),
115            "connect_timeout" => Ok(self.params.connect_timeout),
116            "pdu_size" => Err(Error::PlcError {
117                code: 0,
118                message: "pdu_size is not a Duration; use .params.pdu_size directly".into(),
119            }),
120            _ => Err(Error::PlcError {
121                code: 0,
122                message: format!("unknown parameter: {name}"),
123            }),
124        }
125    }
126
127    /// Set a client parameter at runtime.
128    ///
129    /// Supported names: `"request_timeout"` (Duration).
130    pub fn set_param(&mut self, name: &str, value: std::time::Duration) -> Result<()> {
131        match name {
132            "request_timeout" => {
133                self.params.request_timeout = value;
134                Ok(())
135            }
136            _ => Err(Error::PlcError {
137                code: 0,
138                message: format!("unknown parameter: {name}"),
139            }),
140        }
141    }
142
143    fn next_pdu_ref(inner: &mut Inner<T>) -> u16 {
144        inner.pdu_ref = inner.pdu_ref.wrapping_add(1);
145        inner.pdu_ref
146    }
147
148    async fn send_s7(
149        inner: &mut Inner<T>,
150        param_buf: Bytes,
151        data_buf: Bytes,
152        pdu_ref: u16,
153        pdu_type: PduType,
154    ) -> Result<()> {
155        let header = S7Header {
156            pdu_type,
157            reserved: 0,
158            pdu_ref,
159            param_len: param_buf.len() as u16,
160            data_len: data_buf.len() as u16,
161            error_class: None,
162            error_code: None,
163        };
164        let mut s7b = BytesMut::new();
165        header.encode(&mut s7b);
166        s7b.extend_from_slice(&param_buf);
167        s7b.extend_from_slice(&data_buf);
168
169        let dt = CotpPdu::Data {
170            tpdu_nr: 0,
171            last: true,
172            payload: s7b.freeze(),
173        };
174        let mut cotpb = BytesMut::new();
175        dt.encode(&mut cotpb);
176        let tpkt = TpktFrame {
177            payload: cotpb.freeze(),
178        };
179        let mut tb = BytesMut::new();
180        tpkt.encode(&mut tb)?;
181        inner.transport.write_all(&tb).await?;
182        Ok(())
183    }
184
185    async fn recv_s7(inner: &mut Inner<T>) -> Result<(S7Header, Bytes)> {
186        let timeout = inner.request_timeout;
187        let mut tpkt_hdr = [0u8; 4];
188        tokio::time::timeout(timeout, inner.transport.read_exact(&mut tpkt_hdr))
189            .await
190            .map_err(|_| Error::Timeout(timeout))??;
191        let total = u16::from_be_bytes([tpkt_hdr[2], tpkt_hdr[3]]) as usize;
192        if total < 4 {
193            return Err(Error::UnexpectedResponse);
194        }
195        let mut payload = vec![0u8; total - 4];
196        tokio::time::timeout(timeout, inner.transport.read_exact(&mut payload))
197            .await
198            .map_err(|_| Error::Timeout(timeout))??;
199        let mut b = Bytes::from(payload);
200
201        // COTP DT header: LI (1) + code (1) + tpdu_nr (1)
202        if b.remaining() < 3 {
203            return Err(Error::UnexpectedResponse);
204        }
205        let _li = b.get_u8();
206        let cotp_code = b.get_u8();
207        if cotp_code != 0xF0 {
208            return Err(Error::UnexpectedResponse);
209        }
210        b.advance(1); // tpdu_nr byte
211
212        let header = S7Header::decode(&mut b)?;
213        Ok((header, b))
214    }
215
216    pub async fn db_read(&self, db: u16, start: u32, length: u16) -> Result<Bytes> {
217        let mut inner = self.inner.lock().await;
218        let pdu_ref = Self::next_pdu_ref(&mut inner);
219
220        let req = ReadVarRequest {
221            items: vec![AddressItem {
222                area: Area::DataBlock,
223                db_number: db,
224                start,
225                bit_offset: 0,
226                length,
227                transport: TransportSize::Byte,
228            }],
229        };
230        let mut param_buf = BytesMut::new();
231        req.encode(&mut param_buf);
232
233        Self::send_s7(
234            &mut inner,
235            param_buf.freeze(),
236            Bytes::new(),
237            pdu_ref,
238            PduType::Job,
239        )
240        .await?;
241
242        let (header, mut body) = Self::recv_s7(&mut inner).await?;
243        check_plc_error(&header, "db_read")?;
244        if body.remaining() >= 2 {
245            body.advance(2); // skip param echo: func + item count
246        }
247        let resp = ReadVarResponse::decode(&mut body, 1)?;
248        if resp.items.is_empty() {
249            return Err(Error::UnexpectedResponse);
250        }
251        if resp.items[0].return_code != 0xFF {
252            return Err(Error::PlcError {
253                code: resp.items[0].return_code as u32,
254                message: "item error".into(),
255            });
256        }
257        Ok(resp.items[0].data.clone())
258    }
259
260    /// Read multiple PLC regions in one or more S7 PDU exchanges.
261    ///
262    /// Automatically batches items when the item count would exceed the Siemens hard
263    /// limit of 20 per PDU, or when the encoded request or response would exceed the
264    /// negotiated PDU size. Returns one `Bytes` per item in input order.
265    ///
266    /// Unlike `db_read`, this accepts any `Area` and `TransportSize`.
267    pub async fn read_multi_vars(&self, items: &[MultiReadItem]) -> Result<Vec<Bytes>> {
268        if items.is_empty() {
269            return Ok(Vec::new());
270        }
271
272        // PDU size constants (in bytes)
273        // S7 header: 10, func+count: 2, per-item address: 12
274        const S7_HEADER: usize = 10;
275        const PARAM_OVERHEAD: usize = 2; // func + item count
276        const ADDR_ITEM_SIZE: usize = 12;
277        // Response data item: 4 header + data + 0/1 pad
278        const DATA_ITEM_OVERHEAD: usize = 4;
279        const MAX_ITEMS_PER_PDU: usize = 20;
280
281        let mut inner = self.inner.lock().await;
282        let pdu_size = inner.connection.pdu_size as usize;
283        let max_req_payload = pdu_size.saturating_sub(S7_HEADER + PARAM_OVERHEAD);
284        let max_resp_payload = pdu_size.saturating_sub(S7_HEADER + PARAM_OVERHEAD);
285
286        let mut results = vec![Bytes::new(); items.len()];
287        let mut batch_start = 0;
288
289        while batch_start < items.len() {
290            // Build a batch that fits within PDU limits
291            let mut batch_end = batch_start;
292            let mut req_bytes_used = 0usize;
293            let mut resp_bytes_used = 0usize;
294
295            while batch_end < items.len() && (batch_end - batch_start) < MAX_ITEMS_PER_PDU {
296                let item = &items[batch_end];
297                let item_resp_size =
298                    DATA_ITEM_OVERHEAD + item.length as usize + (item.length as usize % 2);
299
300                if batch_end > batch_start
301                    && (req_bytes_used + ADDR_ITEM_SIZE > max_req_payload
302                        || resp_bytes_used + item_resp_size > max_resp_payload)
303                {
304                    break;
305                }
306                req_bytes_used += ADDR_ITEM_SIZE;
307                resp_bytes_used += item_resp_size;
308                batch_end += 1;
309            }
310
311            let batch = &items[batch_start..batch_end];
312            let pdu_ref = Self::next_pdu_ref(&mut inner);
313
314            let req = ReadVarRequest {
315                items: batch
316                    .iter()
317                    .map(|item| AddressItem {
318                        area: item.area,
319                        db_number: item.db_number,
320                        start: item.start,
321                        bit_offset: 0,
322                        // Siemens requires Byte transport + byte-count length in the request.
323                        // The item's declared transport is only used to decode the response.
324                        length: item.length,
325                        transport: TransportSize::Byte,
326                    })
327                    .collect(),
328            };
329            let mut param_buf = BytesMut::new();
330            req.encode(&mut param_buf);
331
332            Self::send_s7(
333                &mut inner,
334                param_buf.freeze(),
335                Bytes::new(),
336                pdu_ref,
337                PduType::Job,
338            )
339            .await?;
340
341            let (header, mut body) = Self::recv_s7(&mut inner).await?;
342            check_plc_error(&header, "read_multi_vars")?;
343            if body.remaining() >= 2 {
344                body.advance(2); // skip func + item_count echo
345            }
346            let resp = ReadVarResponse::decode(&mut body, batch.len())?;
347
348            for (i, item) in resp.items.into_iter().enumerate() {
349                if item.return_code != 0xFF {
350                    return Err(Error::PlcError {
351                        code: item.return_code as u32,
352                        message: format!("item {} error", batch_start + i),
353                    });
354                }
355                results[batch_start + i] = item.data;
356            }
357
358            batch_start = batch_end;
359        }
360
361        Ok(results)
362    }
363
364    /// Write multiple PLC regions in one or more S7 PDU exchanges.
365    ///
366    /// Automatically batches items when the count or encoded size would exceed the
367    /// negotiated PDU size or the Siemens hard limit of 20 items per PDU.
368    /// Returns `Ok(())` only when all items are acknowledged with return code 0xFF.
369    pub async fn write_multi_vars(&self, items: &[MultiWriteItem]) -> Result<()> {
370        if items.is_empty() {
371            return Ok(());
372        }
373
374        const S7_HEADER: usize = 10;
375        const PARAM_OVERHEAD: usize = 2; // func + item count
376        const ADDR_ITEM_SIZE: usize = 12;
377        const DATA_ITEM_OVERHEAD: usize = 4; // reserved + transport + bit_len (2)
378        const MAX_ITEMS_PER_PDU: usize = 20;
379
380        let mut inner = self.inner.lock().await;
381        let pdu_size = inner.connection.pdu_size as usize;
382        let max_payload = pdu_size.saturating_sub(S7_HEADER + PARAM_OVERHEAD);
383
384        let mut batch_start = 0;
385
386        while batch_start < items.len() {
387            let mut batch_end = batch_start;
388            let mut bytes_used = 0usize;
389
390            while batch_end < items.len() && (batch_end - batch_start) < MAX_ITEMS_PER_PDU {
391                let item = &items[batch_end];
392                let data_len = item.data.len();
393                let item_size = ADDR_ITEM_SIZE + DATA_ITEM_OVERHEAD + data_len + (data_len % 2);
394
395                if batch_end > batch_start && bytes_used + item_size > max_payload {
396                    break;
397                }
398                bytes_used += item_size;
399                batch_end += 1;
400            }
401
402            let batch = &items[batch_start..batch_end];
403            let pdu_ref = Self::next_pdu_ref(&mut inner);
404
405            let req = WriteVarRequest {
406                items: batch
407                    .iter()
408                    .map(|item| WriteItem {
409                        address: AddressItem {
410                            area: item.area,
411                            db_number: item.db_number,
412                            start: item.start,
413                            bit_offset: 0,
414                            length: item.data.len() as u16,
415                            transport: TransportSize::Byte,
416                        },
417                        data: item.data.clone(),
418                    })
419                    .collect(),
420            };
421            let mut param_buf = BytesMut::new();
422            req.encode(&mut param_buf);
423
424            Self::send_s7(
425                &mut inner,
426                param_buf.freeze(),
427                Bytes::new(),
428                pdu_ref,
429                PduType::Job,
430            )
431            .await?;
432
433            let (header, mut body) = Self::recv_s7(&mut inner).await?;
434            check_plc_error(&header, "write_multi_vars")?;
435            if body.remaining() >= 2 {
436                body.advance(2); // skip func + item_count echo
437            }
438            let resp = WriteVarResponse::decode(&mut body, batch.len())?;
439            for (i, &code) in resp.return_codes.iter().enumerate() {
440                if code != 0xFF {
441                    return Err(Error::PlcError {
442                        code: code as u32,
443                        message: format!("item {} write error", batch_start + i),
444                    });
445                }
446            }
447
448            batch_start = batch_end;
449        }
450
451        Ok(())
452    }
453
454    pub async fn db_write(&self, db: u16, start: u32, data: &[u8]) -> Result<()> {
455        let mut inner = self.inner.lock().await;
456        let pdu_ref = Self::next_pdu_ref(&mut inner);
457
458        let req = WriteVarRequest {
459            items: vec![WriteItem {
460                address: AddressItem {
461                    area: Area::DataBlock,
462                    db_number: db,
463                    start,
464                    bit_offset: 0,
465                    length: data.len() as u16,
466                    transport: TransportSize::Byte,
467                },
468                data: Bytes::copy_from_slice(data),
469            }],
470        };
471        let mut param_buf = BytesMut::new();
472        req.encode(&mut param_buf);
473
474        Self::send_s7(
475            &mut inner,
476            param_buf.freeze(),
477            Bytes::new(),
478            pdu_ref,
479            PduType::Job,
480        )
481        .await?;
482
483        let (header, mut body) = Self::recv_s7(&mut inner).await?;
484        check_plc_error(&header, "db_write")?;
485        if body.has_remaining() {
486            body.advance(2); // skip func + item count
487        }
488        let resp = WriteVarResponse::decode(&mut body, 1)?;
489        if resp.return_codes[0] != 0xFF {
490            return Err(Error::PlcError {
491                code: resp.return_codes[0] as u32,
492                message: "write error".into(),
493            });
494        }
495        Ok(())
496    }
497
498    /// Read from any PLC area using absolute addressing.
499    ///
500    /// A convenience wrapper around [`read_multi_vars`](Self::read_multi_vars)
501    /// for a single area read.
502    pub async fn ab_read(
503        &self,
504        area: Area,
505        db_number: u16,
506        start: u32,
507        length: u16,
508    ) -> Result<Bytes> {
509        let items = [MultiReadItem {
510            area,
511            db_number,
512            start,
513            length,
514            transport: TransportSize::Byte,
515        }];
516        let mut results = self.read_multi_vars(&items).await?;
517        Ok(results.swap_remove(0))
518    }
519
520    /// Write to any PLC area using absolute addressing.
521    ///
522    /// A convenience wrapper around [`write_multi_vars`](Self::write_multi_vars)
523    /// for a single area write.
524    pub async fn ab_write(
525        &self,
526        area: Area,
527        db_number: u16,
528        start: u32,
529        data: &[u8],
530    ) -> Result<()> {
531        let items = [MultiWriteItem {
532            area,
533            db_number,
534            start,
535            data: Bytes::copy_from_slice(data),
536        }];
537        self.write_multi_vars(&items).await
538    }
539
540    pub async fn read_szl(&self, szl_id: u16, szl_index: u16) -> Result<SzlResponse> {
541        let payload = self.read_szl_payload(szl_id, szl_index).await?;
542        let mut b = payload;
543        Ok(SzlResponse::decode(&mut b)?)
544    }
545
546    /// Send a UserData SZL query and return the raw SZL data block
547    /// (starting with block_len, szl_id, szl_index, then entry data).
548    async fn read_szl_payload(&self, szl_id: u16, szl_index: u16) -> Result<Bytes> {
549        let mut inner = self.inner.lock().await;
550        let pdu_ref = Self::next_pdu_ref(&mut inner);
551
552        let req = SzlRequest { szl_id, szl_index };
553        let mut param_buf = BytesMut::new();
554        req.encode_params(&mut param_buf);
555        let mut data_buf = BytesMut::new();
556        req.encode_data(&mut data_buf);
557
558        Self::send_s7(
559            &mut inner,
560            param_buf.freeze(),
561            data_buf.freeze(),
562            pdu_ref,
563            PduType::UserData,
564        )
565        .await?;
566
567        let (header, mut body) = Self::recv_s7(&mut inner).await?;
568
569        // Skip the echoed param section
570        if body.remaining() < header.param_len as usize {
571            return Err(Error::UnexpectedResponse);
572        }
573        body.advance(header.param_len as usize);
574
575        // body is now the data section.
576        // Data envelope: return_code(1) + transport(1) + data_len(2)
577        // If shorter than 4, the PLC returned an error with no data.
578        if body.remaining() < 4 {
579            return Ok(Bytes::new());
580        }
581        let return_code = body.get_u8();
582        let _transport = body.get_u8();
583        let _data_len = body.get_u16();
584
585        // return_code 0xFF = success; anything else = PLC error (function not available etc.)
586        // Return empty payload so callers can handle gracefully.
587        if return_code != 0xFF {
588            return Ok(Bytes::new());
589        }
590
591        // Remaining is the SZL data block.
592        Ok(body.copy_to_bytes(body.remaining()))
593    }
594
595    pub async fn read_clock(&self) -> Result<PlcDateTime> {
596        let mut inner = self.inner.lock().await;
597        let pdu_ref = Self::next_pdu_ref(&mut inner);
598        let mut param_buf = BytesMut::new();
599        param_buf.extend_from_slice(&[0x00, 0x01, 0x12, 0x04, 0xF5, 0x00]);
600        Self::send_s7(
601            &mut inner,
602            param_buf.freeze(),
603            Bytes::new(),
604            pdu_ref,
605            PduType::UserData,
606        )
607        .await?;
608        let (_header, mut body) = Self::recv_s7(&mut inner).await?;
609        if body.remaining() > 8 {
610            body.advance(body.remaining() - 8);
611        }
612        Ok(PlcDateTime::decode(&mut body)?)
613    }
614
615    /// Copy RAM data to ROM (function 0x43).
616    ///
617    /// Copies the CPU's work memory to its load memory (retain on power-off).
618    pub async fn copy_ram_to_rom(&self) -> Result<()> {
619        let mut inner = self.inner.lock().await;
620        let pdu_ref = Self::next_pdu_ref(&mut inner);
621        let param = Bytes::copy_from_slice(&[
622            0x00, 0x01, 0x12, 0x04, 0x43, 0x44, 0x01, 0x00,
623        ]);
624        Self::send_s7(&mut inner, param, Bytes::new(), pdu_ref, PduType::UserData).await?;
625        let (header, _body) = Self::recv_s7(&mut inner).await?;
626        check_plc_error(&header, "copy_ram_to_rom")?;
627        Ok(())
628    }
629
630    /// Compress the PLC work memory (function 0x42).
631    ///
632    /// Reorganises memory to eliminate fragmentation.  The PLC must be in STOP
633    /// mode before calling this.
634    pub async fn compress(&self) -> Result<()> {
635        let mut inner = self.inner.lock().await;
636        let pdu_ref = Self::next_pdu_ref(&mut inner);
637        let param = Bytes::copy_from_slice(&[
638            0x00, 0x01, 0x12, 0x04, 0x42, 0x44, 0x01, 0x00,
639        ]);
640        Self::send_s7(&mut inner, param, Bytes::new(), pdu_ref, PduType::UserData).await?;
641        let (header, _body) = Self::recv_s7(&mut inner).await?;
642        check_plc_error(&header, "compress")?;
643        Ok(())
644    }
645
646    // -- PLC control & status -------------------------------------------------
647
648    /// Send a simple Job with a 2-byte parameter (func + 0x00) and no data.
649    async fn simple_control(inner: &mut Inner<T>, pdu_ref: u16, func: u8) -> Result<()> {
650        let param = Bytes::copy_from_slice(&[func, 0x00]);
651        Self::send_s7(inner, param, Bytes::new(), pdu_ref, PduType::Job).await?;
652        let (header, _body) = Self::recv_s7(inner).await?;
653        check_plc_error(&header, "plc_control")?;
654        Ok(())
655    }
656
657    /// Stop the PLC (S7 function code 0x29).
658    ///
659    /// Sends a Job request with no additional data. Returns `Ok(())` when the
660    /// PLC acknowledges the command, or an error if the PLC rejects it
661    /// (e.g., password-protected or CPU in a non-stoppable state).
662    pub async fn plc_stop(&self) -> Result<()> {
663        let mut inner = self.inner.lock().await;
664        let pdu_ref = Self::next_pdu_ref(&mut inner);
665        Self::simple_control(&mut inner, pdu_ref, 0x29).await
666    }
667
668    /// Hot-start (warm restart) the PLC (S7 function code 0x28).
669    ///
670    /// A warm restart retains the DB content and retentive memory.
671    pub async fn plc_hot_start(&self) -> Result<()> {
672        let mut inner = self.inner.lock().await;
673        let pdu_ref = Self::next_pdu_ref(&mut inner);
674        Self::simple_control(&mut inner, pdu_ref, 0x28).await
675    }
676
677    /// Cold-start (full restart) the PLC (S7 function code 0x2A).
678    ///
679    /// A cold start clears all DBs and non-retentive memory.
680    pub async fn plc_cold_start(&self) -> Result<()> {
681        let mut inner = self.inner.lock().await;
682        let pdu_ref = Self::next_pdu_ref(&mut inner);
683        Self::simple_control(&mut inner, pdu_ref, 0x2A).await
684    }
685
686    /// Read the current PLC status (S7 function code 0x31).
687    ///
688    /// Returns one of [`PlcStatus::Run`], [`PlcStatus::Stop`], or
689    /// [`PlcStatus::Unknown`].
690    pub async fn get_plc_status(&self) -> Result<crate::types::PlcStatus> {
691        let mut inner = self.inner.lock().await;
692        let pdu_ref = Self::next_pdu_ref(&mut inner);
693        let param = Bytes::copy_from_slice(&[0x31, 0x00]);
694        Self::send_s7(&mut inner, param, Bytes::new(), pdu_ref, PduType::Job).await?;
695        let (header, mut body) = Self::recv_s7(&mut inner).await?;
696        check_plc_error(&header, "get_plc_status")?;
697        // Skip param echo: func (1) + reserved (1)
698        if body.remaining() >= 2 {
699            body.advance(2);
700        }
701        if body.remaining() < 1 {
702            return Err(Error::UnexpectedResponse);
703        }
704        let status_byte = body.get_u8();
705        match status_byte {
706            0x00 => Ok(crate::types::PlcStatus::Unknown),
707            0x04 => Ok(crate::types::PlcStatus::Stop),
708            0x08 => Ok(crate::types::PlcStatus::Run),
709            other => Err(Error::PlcError {
710                code: other as u32,
711                message: format!("unknown PLC status byte: 0x{other:02X}"),
712            }),
713        }
714    }
715
716    // -- PLC information queries (via SZL UserData) ---------------------------
717
718    /// Read the PLC order code (SZL ID 0x0011).
719    ///
720    /// The order code is a 20-character ASCII string (e.g. `"6ES7 317-2EK14-0AB0"`).
721    pub async fn get_order_code(&self) -> Result<crate::types::OrderCode> {
722        let payload = self.read_szl_payload(0x0011, 0x0000).await?;
723        if payload.len() < 8 {
724            return Err(Error::UnexpectedResponse);
725        }
726
727        // SZL 0x0011 payload: [szl_id:2][szl_index:2][entry_len:2][entry_count:2][entries...]
728        // Each entry: [index:2][data: entry_len-2 bytes, null-padded]
729        // Entry 0x0001 = order code string; version bytes = last 3 bytes of entire payload.
730        let n = payload.len();
731        let (v1, v2, v3) = if n >= 3 {
732            (payload[n - 3], payload[n - 2], payload[n - 1])
733        } else {
734            (0, 0, 0)
735        };
736
737        let mut b = payload.clone();
738        let szl_id = b.get_u16();
739        let _szl_idx = b.get_u16();
740        let entry_len = b.get_u16() as usize;
741        let entry_count = b.get_u16() as usize;
742
743        if (szl_id == 0x0011 || szl_id == 0x001C) && entry_len >= 4 && entry_count > 0 {
744            for _ in 0..entry_count {
745                if b.remaining() < entry_len { break; }
746                let entry_idx = b.get_u16();
747                let string_len = entry_len - 2;
748                let raw = b.copy_to_bytes(string_len);
749                if entry_idx == 0x0001 {
750                    let null_end = raw.iter().position(|&x| x == 0).unwrap_or(string_len);
751                    let code = String::from_utf8_lossy(&raw[..null_end]).trim().to_string();
752                    if !code.is_empty() {
753                        return Ok(crate::types::OrderCode { code, v1, v2, v3 });
754                    }
755                }
756            }
757        }
758
759        // Fallback: scan for "6ES"/"6AV"/"6GK" pattern anywhere in payload.
760        let code = scan_ascii_fields(&payload, 10, 4).into_iter().find(|s| {
761            let su = s.to_uppercase();
762            (su.starts_with("6ES") || su.starts_with("6AV") || su.starts_with("6GK"))
763                && s.len() >= 10
764                && s.bytes().all(|c| c.is_ascii_graphic() || c == b' ')
765        }).unwrap_or_default();
766        Ok(crate::types::OrderCode { code, v1, v2, v3 })
767    }
768
769    /// Read detailed CPU information (SZL ID 0x001C).
770    ///
771    /// Returns module type, serial number, plant identification, copyright
772    /// and module name fields pre-parsed from the SZL response.
773    /// Handles both classic S7-300/400 and S7-1200/1500 response formats.
774    pub async fn get_cpu_info(&self) -> Result<crate::types::CpuInfo> {
775        let payload = self.read_szl_payload(0x001C, 0x0000).await?;
776        if payload.len() < 8 {
777            return Err(Error::UnexpectedResponse);
778        }
779
780        // SZL 0x001C payload layout (after correct request framing):
781        //   [szl_id:2=0x001C][szl_index:2][entry_len:2][entry_count:2]
782        //   followed by entry_count entries, each entry_len bytes:
783        //     [entry_index:2][string_data: entry_len-2 bytes, null-padded]
784        //
785        // Entry indices observed on S7-300/400:
786        //   0x0001 = plant identification (AS name)
787        //   0x0002 = module type name (e.g. "CPU 319-3 PN/DP")
788        //   0x0003 = module name (OB1 program name)
789        //   0x0004 = copyright
790        //   0x0005 = serial number
791        //   0x0007 = module type name (duplicate in some firmware)
792        //   0x0008 = module name (duplicate in some firmware)
793        let mut b = payload.clone();
794        let szl_id = b.get_u16();
795        let _szl_idx = b.get_u16();
796        let entry_len = b.get_u16() as usize;
797        let entry_count = b.get_u16() as usize;
798
799        if szl_id == 0x001C && entry_len >= 4 && entry_count > 0 {
800            let mut module_type = String::new();
801            let mut module_type_canonical = String::new(); // index 0x0007 — always authoritative
802            let mut serial_number = String::new();
803            let mut as_name = String::new();
804            let mut copyright = String::new();
805            let mut module_name = String::new();
806
807            for _ in 0..entry_count {
808                if b.remaining() < entry_len { break; }
809                let entry_idx = b.get_u16();
810                let string_len = entry_len - 2;
811                let raw = b.copy_to_bytes(string_len);
812                let null_end = raw.iter().position(|&x| x == 0).unwrap_or(string_len);
813                let val = String::from_utf8_lossy(&raw[..null_end]).trim().to_string();
814                match entry_idx {
815                    0x0001 => { if as_name.is_empty() { as_name = val; } }
816                    // 0x0002 is module type on S7-300, AS name on S7-1500 — only use if
817                    // 0x0007 is absent (module_type_canonical will override below).
818                    0x0002 => { if module_type.is_empty() { module_type = val; } }
819                    0x0003 => { if module_name.is_empty() { module_name = val; } }
820                    0x0004 => { if copyright.is_empty() { copyright = val; } }
821                    0x0005 => { if serial_number.is_empty() { serial_number = val; } }
822                    // 0x0007 is always the true module type name (both S7-300 and S7-1500)
823                    0x0007 => { if module_type_canonical.is_empty() { module_type_canonical = val; } }
824                    // 0x0008 is SMC memory card on S7-1500 — do not use for module_name
825                    _ => {}
826                }
827            }
828
829            // 0x0007 wins over 0x0002 for module_type
830            if !module_type_canonical.is_empty() {
831                module_type = module_type_canonical;
832            }
833
834            if module_name.is_empty() && !as_name.is_empty() {
835                module_name = as_name.clone();
836            }
837
838            if !module_type.is_empty() || !serial_number.is_empty() || !as_name.is_empty() {
839                let protocol = detect_protocol(&payload, &module_type);
840                return Ok(crate::types::CpuInfo {
841                    module_type,
842                    serial_number,
843                    as_name,
844                    copyright,
845                    module_name,
846                    protocol,
847                });
848            }
849        }
850
851        // S7-1500 and some firmware variants use a tagged sub-record format.
852        // Fall back to scanning the raw payload for tagged string fields.
853        let data = payload.as_ref();
854        let (module_type, serial_number, as_name, copyright, module_name) =
855            parse_sub_record_fields(data);
856
857        if !module_type.is_empty() || !serial_number.is_empty() {
858            let protocol = detect_protocol(&payload, &module_type);
859            return Ok(crate::types::CpuInfo {
860                module_type,
861                serial_number,
862                as_name,
863                copyright,
864                module_name,
865                protocol,
866            });
867        }
868
869        // Last-resort scan: extract printable strings and apply heuristics.
870        let mut module_type = String::new();
871        let mut serial_number = String::new();
872        let mut as_name = String::new();
873        let mut copyright = String::new();
874        let mut module_name = String::new();
875
876        let mut scan = 0;
877        while scan < data.len() {
878            if data[scan].is_ascii_graphic() || data[scan] == b' ' {
879                let start = scan;
880                while scan < data.len() && (data[scan].is_ascii_graphic() || data[scan] == b' ') {
881                    scan += 1;
882                }
883                let val = String::from_utf8_lossy(&data[start..scan]).trim().to_string();
884                if val.len() >= 3 {
885                    let tag = if start >= 2 && data[start - 2] == 0x00 {
886                        Some(data[start - 1])
887                    } else {
888                        None
889                    };
890                    let su = val.to_uppercase();
891                    if su.contains("BOOT") || su.starts_with("P B") || su.starts_with("HBOOT") {
892                        // skip firmware label
893                    } else if tag == Some(0x07) && module_type.is_empty() {
894                        module_type = val;
895                    } else if tag == Some(0x08) && module_name.is_empty() {
896                        module_name = val;
897                    } else if tag == Some(0x05) && as_name.is_empty() {
898                        as_name = val;
899                    } else if tag == Some(0x06) && copyright.is_empty() {
900                        copyright = val;
901                    } else if tag == Some(0x04) && serial_number.is_empty() {
902                        serial_number = val;
903                    } else if val.contains('-')
904                        && val.chars().filter(|c| c.is_ascii_digit()).count() >= 4
905                        && !val.starts_with("6ES7")
906                        && serial_number.is_empty()
907                    {
908                        serial_number = val;
909                    } else if su.contains("CPU") && su.contains("PN") && module_type.is_empty() {
910                        module_type = val;
911                    } else if module_type.is_empty() && val.len() >= 8 && !su.contains("MC_") {
912                        module_type = val;
913                    }
914                }
915            } else {
916                scan += 1;
917            }
918        }
919
920        let protocol = detect_protocol(&payload, &module_type);
921        Ok(crate::types::CpuInfo {
922            module_type,
923            serial_number,
924            as_name,
925            copyright,
926            module_name,
927            protocol,
928        })
929    }
930    
931    /// Read communication processor information (SZL ID 0x0131, index 0x0001).
932    ///
933    /// Returns maximum PDU length, connection count, and baud rates.
934    pub async fn get_cp_info(&self) -> Result<crate::types::CpInfo> {
935        // Index 0x0001 = communication module info entry (used by C snap7).
936        let payload = self.read_szl_payload(0x0131, 0x0001).await?;
937
938        // SZL 0x0131 response wire format (after stripping the 4-byte data envelope):
939        //   [szl_id:2][szl_index:2][entry_len:2][entry_count:2][entries...]
940        // Each entry for index 0x0001 (S7-300/400/1200/1500):
941        //   [index:2][max_pdu_len:2][max_connections:2][max_mpi_rate:4][max_bus_rate:4] = 14 bytes
942
943        let mut b = payload.clone();
944        if b.remaining() < 8 {
945            return Ok(crate::types::CpInfo {
946                max_pdu_len: 0, max_connections: 0, max_mpi_rate: 0, max_bus_rate: 0,
947            });
948        }
949
950        let szl_id = b.get_u16();
951        let _szl_idx = b.get_u16();
952        let entry_len = b.get_u16() as usize;
953        let entry_count = b.get_u16() as usize;
954
955        // Classic format (S7-300/400/1200): szl_id=0x0131, entries with 14-byte records
956        if szl_id == 0x0131 && entry_len >= 12 && entry_count >= 1 && b.remaining() >= entry_len {
957            let _entry_idx = b.get_u16();
958            let max_pdu_len = b.get_u16() as u32;
959            let max_connections = b.get_u16() as u32;
960            let max_mpi_rate = b.get_u32();
961            let max_bus_rate = b.get_u32();
962            return Ok(crate::types::CpInfo {
963                max_pdu_len,
964                max_connections,
965                max_mpi_rate,
966                max_bus_rate,
967            });
968        }
969
970        // Fallback: scan for any parseable numeric data
971        Ok(crate::types::CpInfo {
972            max_pdu_len: 0,
973            max_connections: 0,
974            max_mpi_rate: 0,
975            max_bus_rate: 0,
976        })
977    }
978
979    /// Read the rack module list (SZL ID 0x00A0).
980    ///
981    /// Each entry is a 2-byte module type identifier.
982    pub async fn read_module_list(&self) -> Result<Vec<crate::types::ModuleEntry>> {
983        let payload = self.read_szl_payload(0x00A0, 0x0000).await?;
984        if payload.len() < 6 {
985            return Err(Error::UnexpectedResponse);
986        }
987        let mut b = payload;
988        let _block_len = b.get_u16();
989        let _szl_id = b.get_u16();
990        let _szl_ix = b.get_u16();
991        // Skip the optional SZL entry_length prefix (2 bytes).
992        skip_szl_entry_header(&mut b);
993        let mut modules = Vec::new();
994        while b.remaining() >= 2 {
995            modules.push(crate::types::ModuleEntry {
996                module_type: b.get_u16(),
997            });
998        }
999        Ok(modules)
1000    }
1001
1002    // -- Block list & block info (via SZL + UserData) -------------------------
1003
1004    /// List all blocks in the PLC grouped by type (SZL 0x0130).
1005    ///
1006    /// Returns a [`BlockList`] with the total block count and per-type entries.
1007    pub async fn list_blocks(&self) -> Result<crate::types::BlockList> {
1008        let payload = self.read_szl_payload(0x0130, 0x0000).await?;
1009        if payload.len() < 10 {
1010            return Err(Error::UnexpectedResponse);
1011        }
1012        let mut b = payload;
1013        let _block_len = b.get_u16();
1014        let _resp_szl_id = b.get_u16();
1015        let _szl_ix = b.get_u16();
1016
1017        // S7-1500 format: strip sub-block header and entry prefixes.
1018        let mut szl_data = b;
1019        if szl_data.len() >= 2 && szl_data[0] == 0x04
1020            && (szl_data[1] == 0x02 || szl_data[1] == 0x03)
1021        {
1022            szl_data.advance(2);
1023            while szl_data.len() >= 4
1024                && szl_data[0] == 0xFF && szl_data[1] == 0x04
1025            {
1026                let entry_len = u16::from_be_bytes([szl_data[2], szl_data[3]]) as usize;
1027                let skip = 4 + entry_len;
1028                if skip > szl_data.len() { break; }
1029                szl_data.advance(skip);
1030            }
1031        }
1032        // Skip the optional SZL entry_length prefix (2 bytes).
1033        skip_szl_entry_header(&mut szl_data);
1034        let total_count = szl_data.get_u32();
1035        let mut entries = Vec::new();
1036        while szl_data.remaining() >= 4 {
1037            entries.push(crate::types::BlockListEntry {
1038                block_type: szl_data.get_u16(),
1039                count: szl_data.get_u16(),
1040            });
1041        }
1042        Ok(crate::types::BlockList {
1043            total_count,
1044            entries,
1045        })
1046    }
1047
1048    /// Internal: send a UserData block-info request and return the raw response
1049    /// data section payload (4-byte envelope skipped).
1050    async fn block_info_query(
1051        &self,
1052        func: u8,
1053        block_type: u8,
1054        block_number: u16,
1055    ) -> Result<Bytes> {
1056        let mut inner = self.inner.lock().await;
1057        let pdu_ref = Self::next_pdu_ref(&mut inner);
1058
1059        // UserData param for block info (function 0x13 or 0x14):
1060        //   [8-byte header] [block_type(1)] [0x00] [block_number(2)]
1061        let mut param_buf = BytesMut::with_capacity(12);
1062        param_buf.extend_from_slice(&[
1063            0x00, 0x01, 0x12, 0x04, func, 0x44, 0x01, 0x00,
1064            block_type, 0x00,
1065        ]);
1066        param_buf.put_u16(block_number);
1067
1068        Self::send_s7(
1069            &mut inner,
1070            param_buf.freeze(),
1071            Bytes::new(),
1072            pdu_ref,
1073            PduType::UserData,
1074        )
1075        .await?;
1076
1077        let (header, mut body) = Self::recv_s7(&mut inner).await?;
1078
1079        // Skip echoed param section
1080        if body.remaining() < header.param_len as usize {
1081            return Err(Error::UnexpectedResponse);
1082        }
1083        body.advance(header.param_len as usize);
1084
1085        // Skip 4-byte data envelope (return_code, transport, data_len)
1086        if body.remaining() < 4 {
1087            return Err(Error::UnexpectedResponse);
1088        }
1089        body.advance(4);
1090
1091        Ok(body.copy_to_bytes(body.remaining()))
1092    }
1093
1094    /// Get detailed information about a block stored on the PLC.
1095    ///
1096    /// `block_type` should be one of the [`BlockType`](crate::types::BlockType)
1097    /// discriminant values (e.g. `0x41` for DB, `0x38` for OB).
1098    pub async fn get_ag_block_info(
1099        &self,
1100        block_type: u8,
1101        block_number: u16,
1102    ) -> Result<crate::types::BlockInfo> {
1103        self.get_block_info(0x13, block_type, block_number).await
1104    }
1105
1106    /// Get detailed block information from the PG perspective.
1107    ///
1108    /// Same fields as [`get_ag_block_info`](Self::get_ag_block_info) but the
1109    /// information is from the programming-device viewpoint.
1110    pub async fn get_pg_block_info(
1111        &self,
1112        block_type: u8,
1113        block_number: u16,
1114    ) -> Result<crate::types::BlockInfo> {
1115        self.get_block_info(0x14, block_type, block_number).await
1116    }
1117
1118    /// Shared implementation for AG and PG block info.
1119    async fn get_block_info(
1120        &self,
1121        func: u8,
1122        block_type: u8,
1123        block_number: u16,
1124    ) -> Result<crate::types::BlockInfo> {
1125        let payload = self
1126            .block_info_query(func, block_type, block_number)
1127            .await?;
1128        // Minimum for a valid block info: 6-byte header + block_type + block_number + language + flags + ...
1129        if payload.len() < 24 {
1130            return Err(Error::UnexpectedResponse);
1131        }
1132        let mut b = payload;
1133
1134        // Parse block info response (field order derived from S7 protocol):
1135        let _blk_type_hi = b.get_u16(); // may echo block type as u16
1136        let blk_number = b.get_u16();
1137        let language = b.get_u16();
1138        let flags = b.get_u16();
1139        let mc7_size = b.get_u16();
1140        let _size_lo = b.get_u16(); // load-memory size low word
1141        let size_ram = b.get_u16();
1142        let _size_ro = b.get_u16(); // 0 or RO-size
1143        let local_data = b.get_u16();
1144        let checksum = b.get_u16();
1145        let version = b.get_u16();
1146
1147        // String fields: author(8), family(8), header(20?), date(8)
1148        let author = if b.remaining() >= 8 {
1149            String::from_utf8_lossy(&b[..8]).trim_end_matches('\0').trim().to_string()
1150        } else { String::new() };
1151        b.advance(8.min(b.remaining()));
1152
1153        let family = if b.remaining() >= 8 {
1154            String::from_utf8_lossy(&b[..8]).trim_end_matches('\0').trim().to_string()
1155        } else { String::new() };
1156        b.advance(8.min(b.remaining()));
1157
1158        let header = if b.remaining() >= 20 {
1159            String::from_utf8_lossy(&b[..20]).trim_end_matches('\0').trim().to_string()
1160        } else { String::new() };
1161        b.advance(20.min(b.remaining()));
1162
1163        let date = if b.remaining() >= 8 {
1164            String::from_utf8_lossy(&b[..8]).trim_end_matches('\0').trim().to_string()
1165        } else { String::new() };
1166
1167        // Reconstruct total size from the two size halves
1168        let size = ((_blk_type_hi as u32) << 16) | (b.len() as u32 & 0xFFFF);
1169        let size_u16 = size.min(0xFFFF) as u16;
1170
1171        Ok(crate::types::BlockInfo {
1172            block_type: _blk_type_hi,
1173            block_number: blk_number,
1174            language,
1175            flags,
1176            size: size_u16,
1177            size_ram,
1178            mc7_size,
1179            local_data,
1180            checksum,
1181            version,
1182            author,
1183            family,
1184            header,
1185            date,
1186        })
1187    }
1188
1189    // -- Security / protection (set/clear password + get protection) ----------
1190
1191    /// Set a session password for protected PLC access.
1192    ///
1193    /// The password is obfuscated using the S7 nibble-swap + XOR-0x55 algorithm
1194    /// and sent as a Job PDU with function code 0x12.  Passwords longer than
1195    /// 8 bytes are truncated.
1196    pub async fn set_session_password(&self, password: &str) -> Result<()> {
1197        let encrypted = crate::types::encrypt_password(password);
1198        let mut inner = self.inner.lock().await;
1199        let pdu_ref = Self::next_pdu_ref(&mut inner);
1200        let param = Bytes::copy_from_slice(&[0x12, 0x00]);
1201        let data = Bytes::copy_from_slice(&encrypted);
1202        Self::send_s7(&mut inner, param, data, pdu_ref, PduType::Job).await?;
1203        let (header, _body) = Self::recv_s7(&mut inner).await?;
1204        check_plc_error(&header, "set_session_password")?;
1205        Ok(())
1206    }
1207
1208    /// Clear the session password on the PLC (function code 0x11).
1209    pub async fn clear_session_password(&self) -> Result<()> {
1210        let mut inner = self.inner.lock().await;
1211        let pdu_ref = Self::next_pdu_ref(&mut inner);
1212        let param = Bytes::copy_from_slice(&[0x11, 0x00]);
1213        Self::send_s7(&mut inner, param, Bytes::new(), pdu_ref, PduType::Job).await?;
1214        let (header, _body) = Self::recv_s7(&mut inner).await?;
1215        check_plc_error(&header, "clear_session_password")?;
1216        Ok(())
1217    }
1218
1219    /// Read the current protection level (SZL ID 0x0032, index 0x0004).
1220    ///
1221    /// Returns the protection scheme identifiers and level;
1222    /// `password_set` is `true` when the PLC reports a non-empty password.
1223    pub async fn get_protection(&self) -> Result<crate::types::Protection> {
1224        let payload = self.read_szl_payload(0x0032, 0x0004).await?;
1225        if payload.len() < 14 {
1226            return Err(Error::UnexpectedResponse);
1227        }
1228        let mut b = payload;
1229        let _block_len = b.get_u16();
1230        let _szl_id = b.get_u16();
1231        let _szl_ix = b.get_u16();
1232        // Skip the optional SZL entry_length prefix (2 bytes).
1233        skip_szl_entry_header(&mut b);
1234        let scheme_szl = b.get_u16();
1235        let scheme_module = b.get_u16();
1236        let scheme_bus = b.get_u16();
1237        let level = b.get_u16();
1238        // Next 8 bytes = pass_word field ("PASSWORD" if set, spaces otherwise)
1239        let pass_wort = if b.remaining() >= 8 {
1240            String::from_utf8_lossy(&b[..8]).trim().to_string()
1241        } else {
1242            String::new()
1243        };
1244        let password_set = pass_wort.eq_ignore_ascii_case("PASSWORD");
1245        Ok(crate::types::Protection {
1246            scheme_szl,
1247            scheme_module,
1248            scheme_bus,
1249            level,
1250            password_set,
1251        })
1252    }
1253
1254    // -- Block upload / download / delete ------------------------------------
1255    //
1256    // S7 function 0x1D = Upload  (sub-fn: 0=start, 1=data, 2=end)
1257    // S7 function 0x1E = Download (sub-fn: 0=start, 1=data, 2=end)
1258    // S7 function 0x1F = Delete
1259
1260    /// Delete a block from the PLC (S7 function code 0x1F).
1261    pub async fn delete_block(&self, block_type: u8, block_number: u16) -> Result<()> {
1262        let mut inner = self.inner.lock().await;
1263        let pdu_ref = Self::next_pdu_ref(&mut inner);
1264        // param: [0x1F, 0x00, block_type, 0x00, block_number(2)]
1265        let mut param = BytesMut::with_capacity(6);
1266        param.extend_from_slice(&[0x1F, 0x00, block_type, 0x00]);
1267        param.put_u16(block_number);
1268        Self::send_s7(
1269            &mut inner,
1270            param.freeze(),
1271            Bytes::new(),
1272            pdu_ref,
1273            PduType::Job,
1274        )
1275        .await?;
1276        let (header, _body) = Self::recv_s7(&mut inner).await?;
1277        check_plc_error(&header, "delete_block")?;
1278        Ok(())
1279    }
1280
1281    /// Upload a PLC block via S7 PI-Upload (function 0x1D).
1282    ///
1283    /// Returns the raw block bytes in Diagra format (20-byte header + payload).
1284    /// Use [`BlockData::from_bytes`] to parse the result.
1285    pub async fn upload(&self, block_type: u8, block_number: u16) -> Result<Vec<u8>> {
1286        let mut inner = self.inner.lock().await;
1287        let pdu_ref = Self::next_pdu_ref(&mut inner);
1288
1289        // --- Step 1: Start upload (sub-fn=0x00) ---
1290        // param: [0x1D, 0x00, block_type, 0x00, block_number(2)]
1291        let mut param = BytesMut::with_capacity(6);
1292        param.extend_from_slice(&[0x1D, 0x00, block_type, 0x00]);
1293        param.put_u16(block_number);
1294        Self::send_s7(
1295            &mut inner,
1296            param.freeze(),
1297            Bytes::new(),
1298            pdu_ref,
1299            PduType::Job,
1300        )
1301        .await?;
1302        let (header, mut body) = Self::recv_s7(&mut inner).await?;
1303        check_plc_error(&header, "upload_start")?;
1304        // Response data: [upload_id(4)][total_len(4)]
1305        if body.remaining() < 8 {
1306            return Err(Error::UnexpectedResponse);
1307        }
1308        if body.remaining() >= 2 {
1309            body.advance(2); // skip param echo
1310        }
1311        let upload_id = body.get_u32();
1312        let _total_len = body.get_u32();
1313
1314        // --- Step 2: Loop data chunks (sub-fn=0x01) ---
1315        let mut block_data = Vec::new();
1316        loop {
1317            let chunk_pdu_ref = Self::next_pdu_ref(&mut inner);
1318            let mut dparam = BytesMut::with_capacity(6);
1319            dparam.extend_from_slice(&[0x1D, 0x01]);
1320            dparam.put_u32(upload_id);
1321            Self::send_s7(
1322                &mut inner,
1323                dparam.freeze(),
1324                Bytes::new(),
1325                chunk_pdu_ref,
1326                PduType::Job,
1327            )
1328            .await?;
1329            let (dheader, mut dbody) = Self::recv_s7(&mut inner).await?;
1330            check_plc_error(&dheader, "upload_data")?;
1331            // Skip param echo
1332            if dbody.remaining() >= 2 {
1333                dbody.advance(2);
1334            }
1335            if dbody.is_empty() {
1336                break; // no more data
1337            }
1338            // The first data PDU may have a 4-byte "data header" before the actual block data
1339            // (return_code + transport + bit_len).  Skip it.
1340            if block_data.is_empty() && dbody.remaining() >= 4 {
1341                // Peek at the first byte — if it looks like a return_code (0xFF), skip 4
1342                if dbody[0] == 0xFF || dbody[0] == 0x00 {
1343                    dbody.advance(4);
1344                }
1345            }
1346            let chunk = dbody.copy_to_bytes(dbody.remaining());
1347            block_data.extend_from_slice(&chunk);
1348
1349            // If this chunk was smaller than PDU size, it's the last one
1350            if chunk.len() < inner.connection.pdu_size as usize - 50 {
1351                break;
1352            }
1353            // Safety: prevent infinite loop on broken PLC
1354            if block_data.len() > 1024 * 1024 * 4 {
1355                // 4 MB
1356                return Err(Error::UnexpectedResponse);
1357            }
1358        }
1359
1360        // --- Step 3: End upload (sub-fn=0x02) ---
1361        let end_pdu_ref = Self::next_pdu_ref(&mut inner);
1362        let mut eparam = BytesMut::with_capacity(6);
1363        eparam.extend_from_slice(&[0x1D, 0x02]);
1364        eparam.put_u32(upload_id);
1365        Self::send_s7(
1366            &mut inner,
1367            eparam.freeze(),
1368            Bytes::new(),
1369            end_pdu_ref,
1370            PduType::Job,
1371        )
1372        .await?;
1373        let (eheader, _ebody) = Self::recv_s7(&mut inner).await?;
1374        check_plc_error(&eheader, "upload_end")?;
1375
1376        Ok(block_data)
1377    }
1378
1379    /// Upload a DB block (convenience wrapper around [`upload`](Self::upload)).
1380    pub async fn db_get(&self, db_number: u16) -> Result<Vec<u8>> {
1381        self.upload(0x41, db_number).await // Block_DB = 0x41
1382    }
1383
1384    /// Download a block to the PLC (S7 function 0x1E).
1385    ///
1386    /// `data` should be in Diagra format (20-byte header + payload, as returned by
1387    /// [`upload`](Self::upload) or built via [`BlockData::to_bytes`]).
1388    pub async fn download(&self, block_type: u8, block_number: u16, data: &[u8]) -> Result<()> {
1389        let total_len = data.len() as u32;
1390        let mut inner = self.inner.lock().await;
1391        let pdu_avail = (inner.connection.pdu_size as usize).saturating_sub(50);
1392
1393        // --- Step 1: Start download (sub-fn=0x00) ---
1394        let start_ref = Self::next_pdu_ref(&mut inner);
1395        // param: [0x1E, 0x00, block_type, 0x00, block_number(2), total_len(4)]
1396        let mut sparam = BytesMut::with_capacity(10);
1397        sparam.extend_from_slice(&[0x1E, 0x00, block_type, 0x00]);
1398        sparam.put_u16(block_number);
1399        sparam.put_u32(total_len);
1400
1401        // First data chunk
1402        let chunk_len = pdu_avail.min(data.len());
1403        let first_chunk = Bytes::copy_from_slice(&data[..chunk_len]);
1404        Self::send_s7(
1405            &mut inner,
1406            sparam.freeze(),
1407            first_chunk,
1408            start_ref,
1409            PduType::Job,
1410        )
1411        .await?;
1412
1413        let (sheader, mut sbody) = Self::recv_s7(&mut inner).await?;
1414        check_plc_error(&sheader, "download_start")?;
1415        // Response: [download_id(4)]
1416        if sbody.remaining() >= 2 {
1417            sbody.advance(2); // skip param echo
1418        }
1419        if sbody.remaining() < 4 {
1420            return Err(Error::UnexpectedResponse);
1421        }
1422        let download_id = sbody.get_u32();
1423
1424        let mut offset = chunk_len;
1425
1426        // --- Step 2: Send remaining data chunks (sub-fn=0x01) ---
1427        while offset < data.len() {
1428            let chunk_ref = Self::next_pdu_ref(&mut inner);
1429            let end = (offset + pdu_avail).min(data.len());
1430            let chunk = Bytes::copy_from_slice(&data[offset..end]);
1431
1432            let mut dparam = BytesMut::with_capacity(6);
1433            dparam.extend_from_slice(&[0x1E, 0x01]);
1434            dparam.put_u32(download_id);
1435
1436            Self::send_s7(
1437                &mut inner,
1438                dparam.freeze(),
1439                chunk,
1440                chunk_ref,
1441                PduType::Job,
1442            )
1443            .await?;
1444
1445            let (dheader, _dbody) = Self::recv_s7(&mut inner).await?;
1446            check_plc_error(&dheader, "download_data")?;
1447            offset = end;
1448        }
1449
1450        // --- Step 3: End download (sub-fn=0x02) ---
1451        let end_ref = Self::next_pdu_ref(&mut inner);
1452        let mut eparam = BytesMut::with_capacity(6);
1453        eparam.extend_from_slice(&[0x1E, 0x02]);
1454        eparam.put_u32(download_id);
1455        Self::send_s7(
1456            &mut inner,
1457            eparam.freeze(),
1458            Bytes::new(),
1459            end_ref,
1460            PduType::Job,
1461        )
1462        .await?;
1463        let (eheader, _ebody) = Self::recv_s7(&mut inner).await?;
1464        check_plc_error(&eheader, "download_end")?;
1465
1466        Ok(())
1467    }
1468
1469    /// Fill a DB with a constant byte value.
1470    ///
1471    /// Uses [`get_ag_block_info`](Self::get_ag_block_info) to determine the DB
1472    /// size, then writes every byte to `value`.
1473    pub async fn db_fill(&self, db_number: u16, value: u8) -> Result<()> {
1474        let info = self.get_ag_block_info(0x41, db_number).await?; // Block_DB = 0x41
1475        let size = info.size as usize;
1476        if size == 0 {
1477            return Err(Error::PlcError {
1478                code: 0,
1479                message: format!("DB{db_number} has zero size"),
1480            });
1481        }
1482        let data = vec![value; size];
1483        // Write in chunks to respect PDU limits
1484        let chunk_size = 240usize; // conservative
1485        for offset in (0..size).step_by(chunk_size) {
1486            let end = (offset + chunk_size).min(size);
1487            self.db_write(db_number, offset as u32, &data[offset..end])
1488                .await?;
1489        }
1490        Ok(())
1491    }
1492}
1493
1494/// If the leading bytes look like an SZL entry_length header (2-byte big-endian u16
1495/// length value where the high byte is zero), skip them.  Real Siemens PLCs include
1496/// this header; our test server omits it.
1497fn skip_szl_entry_header(data: &mut Bytes) {
1498    if data.len() >= 2 && data[0] == 0x00 && data[1] > 0 && data[1] <= 200 {
1499        data.advance(2);
1500    }
1501}
1502
1503/// Scan byte data for sequences of visible ASCII characters and return them
1504/// as a vector of trimmed strings.  Skips non-ASCII and control bytes between
1505/// sequences.  Useful for extracting CPU info fields from SZL responses across
1506/// different PLC models and firmware versions.
1507fn scan_ascii_fields(data: &[u8], max_count: usize, min_len: usize) -> Vec<String> {
1508    let mut fields = Vec::new();
1509    let mut i = 0;
1510    while i < data.len() && fields.len() < max_count {
1511        // Skip bytes that are not visible ASCII (0x20-0x7E)
1512        if !data[i].is_ascii_graphic() && data[i] != b' ' {
1513            i += 1;
1514            continue;
1515        }
1516        // Collect a run of visible ASCII
1517        let start = i;
1518        while i < data.len() && (data[i].is_ascii_graphic() || data[i] == b' ') {
1519            i += 1;
1520        }
1521        let s = String::from_utf8_lossy(&data[start..i]).trim().to_string();
1522        if s.len() >= min_len {
1523            fields.push(s);
1524        }
1525    }
1526    fields
1527}
1528
1529/// Parse the S7-300 sub-record format used in SZL 0x001C responses.
1530///
1531/// This format uses tagged records: `[00 <tag> <string>] ...` where
1532/// known tags are:
1533/// - 0x01: order code / module identification
1534/// - 0x05: plant identification (AS name)
1535/// - 0x06: serial number
1536/// - 0x07: module type name
1537/// - 0x08: module name
1538fn parse_sub_record_fields(b: &[u8]) -> (String, String, String, String, String) {
1539    let mut module_type = String::new();
1540    let mut serial_number = String::new();
1541    let mut as_name = String::new();
1542    let mut copyright = String::new();
1543    let mut module_name = String::new();
1544
1545    let mut i = 0;
1546    while i + 2 < b.len() {
1547        // Look for 00 <tag> pattern with a known sub-record tag (1..=8)
1548        if b[i] == 0x00 && (1..=8).contains(&b[i + 1]) {
1549            let tag = b[i + 1];
1550            let start = i + 2;
1551
1552            // Find end of string: next 0x00 byte (including 00 C0)
1553            let mut end = start;
1554            while end < b.len() && b[end] != 0x00 {
1555                end += 1;
1556            }
1557
1558            let raw = &b[start..end];
1559            let val = String::from_utf8_lossy(raw).trim().to_string();
1560
1561            // Skip empty and firmware-label values
1562            let su = val.to_uppercase();
1563            if !val.is_empty() && !su.contains("BOOT") && !su.starts_with("P B") {
1564                match tag {
1565                    0x01 => {
1566                        // Tag 0x01 may be order code (starts with "6ES") or module type.
1567                        if !val.starts_with("6ES") && module_type.is_empty() {
1568                            module_type = val;
1569                        }
1570                    }
1571                    0x05 => { if as_name.is_empty() { as_name = val; } }
1572                    0x06 => { if serial_number.is_empty() { serial_number = val; } }
1573                    0x07 => { if module_type.is_empty() { module_type = val; } }
1574                    0x08 => { if module_name.is_empty() { module_name = val; } }
1575                    _ => {}
1576                }
1577            }
1578
1579            i = end;
1580        } else {
1581            i += 1;
1582        }
1583    }
1584
1585    // Also scan for free-standing printable strings that look like copyright
1586    // (e.g. "Boot Loader" appearing after the tagged records).
1587    if copyright.is_empty() {
1588        let mut scan = 0;
1589        while scan < b.len() {
1590            if b[scan].is_ascii_graphic() || b[scan] == b' ' {
1591                let s = scan;
1592                while scan < b.len() && (b[scan].is_ascii_graphic() || b[scan] == b' ') {
1593                    scan += 1;
1594                }
1595                let val = String::from_utf8_lossy(&b[s..scan]).trim().to_string();
1596                let su = val.to_uppercase();
1597                if val.len() >= 3 {
1598                    if su.contains("BOOT") || su.starts_with("P B") {
1599                        copyright = val;
1600                        break;
1601                    }
1602                }
1603            } else {
1604                scan += 1;
1605            }
1606        }
1607    }
1608
1609    (module_type, serial_number, as_name, copyright, module_name)
1610}
1611
1612/// Determine the S7 protocol variant from the raw SZL payload and extracted module type.
1613///
1614/// - S7-1200/1500/ET200SP uses S7+ protocol: detected from the 0x00 0x01 record marker in the
1615///   payload, or from a module_type containing `"15"` in its model number.
1616/// - Everything else (S7-300, S7-400, S7-1200) uses classic S7 protocol.
1617fn detect_protocol(_payload: &[u8], module_type: &str) -> crate::types::Protocol {
1618    // S7+ protocol: S7-1200, S7-1500, ET 200SP CPU
1619    // Classic S7: S7-300, S7-400
1620    let upper = module_type.to_uppercase();
1621    let is_s7plus = upper.contains("1500")
1622        || upper.contains("1200")
1623        || upper.contains("ET 200SP")
1624        || upper.contains("ET200SP")
1625        // "CPU 15xx" catches 1511, 1513, 1515, 1516, 1517, 1518
1626        || (upper.contains("CPU") && {
1627            let after_cpu = upper.find("CPU").map(|i| &upper[i+3..]).unwrap_or("");
1628            let num: String = after_cpu.chars().skip_while(|c| !c.is_ascii_digit()).take_while(|c| c.is_ascii_digit()).collect();
1629            matches!(num.get(..2), Some("12") | Some("15"))
1630        });
1631
1632    if is_s7plus {
1633        crate::types::Protocol::S7Plus
1634    } else {
1635        crate::types::Protocol::S7
1636    }
1637}
1638
1639
1640/// Decode common S7 protocol error class/code pairs into human-readable descriptions.
1641fn s7_error_description(ec: u8, ecd: u8) -> &'static str {
1642    match (ec, ecd) {
1643        (0x81, 0x04) => "function not supported or access denied by PLC",
1644        (0x81, 0x01) => "reserved by HW or SW function not available",
1645        (0x82, 0x04) => "PLC is in STOP mode, function not possible",
1646        (0x05, 0x01) => "invalid block type number",
1647        (0xD2, 0x01) => "object already exists, download rejected",
1648        (0xD2, 0x02) => "object does not exist, upload failed",
1649        (0xD6, 0x01) => "password protection violation",
1650        (0xD6, 0x05) => "insufficient privilege for this operation",
1651        _ => "unknown error",
1652    }
1653}
1654
1655fn check_plc_error(header: &S7Header, context: &str) -> Result<()> {
1656    if let (Some(ec), Some(ecd)) = (header.error_class, header.error_code) {
1657        if ec != 0 || ecd != 0 {
1658            let detail = s7_error_description(ec, ecd);
1659            return Err(Error::PlcError {
1660                code: ((ec as u32) << 8) | ecd as u32,
1661                message: format!("{}: {} (error_class=0x{ec:02X}, error_code=0x{ecd:02X})", context, detail),
1662            });
1663        }
1664    }
1665    Ok(())
1666}
1667
1668impl S7Client<crate::transport::TcpTransport> {
1669    pub async fn connect(addr: SocketAddr, params: ConnectParams) -> Result<Self> {
1670        let transport =
1671            crate::transport::TcpTransport::connect(addr, params.connect_timeout).await?;
1672        Self::from_transport(transport, params).await
1673    }
1674}
1675
1676impl S7Client<crate::UdpTransport> {
1677    /// Connect to a PLC using UDP transport.
1678    pub async fn connect_udp(addr: SocketAddr, params: ConnectParams) -> Result<Self> {
1679        let transport = crate::UdpTransport::connect(addr)
1680            .await
1681            .map_err(Error::Io)?;
1682        Self::from_transport(transport, params).await
1683    }
1684}
1685
1686#[cfg(test)]
1687mod tests {
1688    use super::*;
1689    use bytes::BufMut;
1690    use crate::proto::{
1691        cotp::CotpPdu,
1692        s7::{
1693            header::{PduType, S7Header},
1694            negotiate::NegotiateResponse,
1695        },
1696        tpkt::TpktFrame,
1697    };
1698    use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
1699
1700    async fn mock_plc_db_read(mut server_io: tokio::io::DuplexStream, response_data: Vec<u8>) {
1701        let mut buf = vec![0u8; 4096];
1702
1703        // respond to COTP CR
1704        let _ = server_io.read(&mut buf).await;
1705        let cc = CotpPdu::ConnectConfirm {
1706            dst_ref: 1,
1707            src_ref: 1,
1708        };
1709        let mut cb = BytesMut::new();
1710        cc.encode(&mut cb);
1711        let mut tb = BytesMut::new();
1712        TpktFrame {
1713            payload: cb.freeze(),
1714        }
1715        .encode(&mut tb)
1716        .unwrap();
1717        server_io.write_all(&tb).await.unwrap();
1718
1719        // respond to S7 negotiate
1720        let _ = server_io.read(&mut buf).await;
1721        let neg = NegotiateResponse {
1722            max_amq_calling: 1,
1723            max_amq_called: 1,
1724            pdu_length: 480,
1725        };
1726        let mut s7b = BytesMut::new();
1727        S7Header {
1728            pdu_type: PduType::AckData,
1729            reserved: 0,
1730            pdu_ref: 1,
1731            param_len: 8,
1732            data_len: 0,
1733            error_class: Some(0),
1734            error_code: Some(0),
1735        }
1736        .encode(&mut s7b);
1737        neg.encode(&mut s7b);
1738        let dt = CotpPdu::Data {
1739            tpdu_nr: 0,
1740            last: true,
1741            payload: s7b.freeze(),
1742        };
1743        let mut cb = BytesMut::new();
1744        dt.encode(&mut cb);
1745        let mut tb = BytesMut::new();
1746        TpktFrame {
1747            payload: cb.freeze(),
1748        }
1749        .encode(&mut tb)
1750        .unwrap();
1751        server_io.write_all(&tb).await.unwrap();
1752
1753        // respond to db_read
1754        let _ = server_io.read(&mut buf).await;
1755        let mut s7b = BytesMut::new();
1756        S7Header {
1757            pdu_type: PduType::AckData,
1758            reserved: 0,
1759            pdu_ref: 2,
1760            param_len: 2,
1761            data_len: (4 + response_data.len()) as u16,
1762            error_class: Some(0),
1763            error_code: Some(0),
1764        }
1765        .encode(&mut s7b);
1766        s7b.extend_from_slice(&[0x04, 0x01]); // ReadVar func + 1 item
1767        s7b.put_u8(0xFF); // return_code = success
1768        s7b.put_u8(0x04); // transport = word
1769        s7b.put_u16((response_data.len() * 8) as u16);
1770        s7b.extend_from_slice(&response_data);
1771        let dt = CotpPdu::Data {
1772            tpdu_nr: 0,
1773            last: true,
1774            payload: s7b.freeze(),
1775        };
1776        let mut cb = BytesMut::new();
1777        dt.encode(&mut cb);
1778        let mut tb = BytesMut::new();
1779        TpktFrame {
1780            payload: cb.freeze(),
1781        }
1782        .encode(&mut tb)
1783        .unwrap();
1784        server_io.write_all(&tb).await.unwrap();
1785    }
1786
1787    #[tokio::test]
1788    async fn db_read_returns_data() {
1789        let (client_io, server_io) = duplex(4096);
1790        let params = ConnectParams::default();
1791        let expected = vec![0xDE, 0xAD, 0xBE, 0xEF];
1792        tokio::spawn(mock_plc_db_read(server_io, expected.clone()));
1793        let client = S7Client::from_transport(client_io, params).await.unwrap();
1794        let data = client.db_read(1, 0, 4).await.unwrap();
1795        assert_eq!(&data[..], &expected[..]);
1796    }
1797
1798    /// Mock that handles COTP+Negotiate handshake then serves one multi-read response.
1799    async fn mock_plc_multi_read(
1800        mut server_io: tokio::io::DuplexStream,
1801        items: Vec<Vec<u8>>, // one byte vec per item
1802    ) {
1803        let mut buf = vec![0u8; 4096];
1804
1805        // COTP CR
1806        let _ = server_io.read(&mut buf).await;
1807        let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
1808        let mut cb = BytesMut::new();
1809        cc.encode(&mut cb);
1810        let mut tb = BytesMut::new();
1811        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1812        server_io.write_all(&tb).await.unwrap();
1813
1814        // S7 Negotiate
1815        let _ = server_io.read(&mut buf).await;
1816        let neg = NegotiateResponse { max_amq_calling: 1, max_amq_called: 1, pdu_length: 480 };
1817        let mut s7b = BytesMut::new();
1818        S7Header {
1819            pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
1820            param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
1821        }.encode(&mut s7b);
1822        neg.encode(&mut s7b);
1823        let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1824        let mut cb = BytesMut::new(); dt.encode(&mut cb);
1825        let mut tb = BytesMut::new();
1826        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1827        server_io.write_all(&tb).await.unwrap();
1828
1829        // ReadMultiVar request
1830        let _ = server_io.read(&mut buf).await;
1831
1832        // Build response data: one DataItem per input item
1833        let item_count = items.len() as u8;
1834        let mut data_bytes = BytesMut::new();
1835        for item_data in &items {
1836            data_bytes.put_u8(0xFF); // return_code OK
1837            data_bytes.put_u8(0x04); // transport byte
1838            data_bytes.put_u16((item_data.len() * 8) as u16);
1839            data_bytes.extend_from_slice(item_data);
1840            if item_data.len() % 2 != 0 {
1841                data_bytes.put_u8(0x00); // pad
1842            }
1843        }
1844        let data_len = data_bytes.len() as u16;
1845        let mut s7b = BytesMut::new();
1846        S7Header {
1847            pdu_type: PduType::AckData, reserved: 0, pdu_ref: 2,
1848            param_len: 2, data_len, error_class: Some(0), error_code: Some(0),
1849        }.encode(&mut s7b);
1850        s7b.extend_from_slice(&[0x04, item_count]); // func + item_count
1851        s7b.extend_from_slice(&data_bytes);
1852
1853        let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1854        let mut cb = BytesMut::new(); dt.encode(&mut cb);
1855        let mut tb = BytesMut::new();
1856        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1857        server_io.write_all(&tb).await.unwrap();
1858    }
1859
1860    #[tokio::test]
1861    async fn read_multi_vars_returns_all_items() {
1862        let (client_io, server_io) = duplex(4096);
1863        let params = ConnectParams::default();
1864        let item1 = vec![0xDE, 0xAD, 0xBE, 0xEF];
1865        let item2 = vec![0x01, 0x02];
1866        tokio::spawn(mock_plc_multi_read(server_io, vec![item1.clone(), item2.clone()]));
1867        let client = S7Client::from_transport(client_io, params).await.unwrap();
1868        let items = [MultiReadItem::db(1, 0, 4), MultiReadItem::db(2, 10, 2)];
1869        let results = client.read_multi_vars(&items).await.unwrap();
1870        assert_eq!(results.len(), 2);
1871        assert_eq!(&results[0][..], &item1[..]);
1872        assert_eq!(&results[1][..], &item2[..]);
1873    }
1874
1875    #[tokio::test]
1876    async fn read_multi_vars_empty_returns_empty() {
1877        let (client_io, server_io) = duplex(4096);
1878        let params = ConnectParams::default();
1879        tokio::spawn(mock_plc_multi_read(server_io, vec![]));
1880        let client = S7Client::from_transport(client_io, params).await.unwrap();
1881        let results = client.read_multi_vars(&[]).await.unwrap();
1882        assert!(results.is_empty());
1883    }
1884
1885    /// Mock that handles COTP+Negotiate then serves N write-response round-trips.
1886    /// `batches` is a list of item counts per round-trip; the mock sends 0xFF for each.
1887    async fn mock_plc_multi_write(
1888        mut server_io: tokio::io::DuplexStream,
1889        pdu_size: u16,
1890        batches: Vec<usize>,
1891    ) {
1892        let mut buf = vec![0u8; 65536];
1893
1894        // COTP CR
1895        let _ = server_io.read(&mut buf).await;
1896        let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
1897        let mut cb = BytesMut::new(); cc.encode(&mut cb);
1898        let mut tb = BytesMut::new();
1899        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1900        server_io.write_all(&tb).await.unwrap();
1901
1902        // S7 Negotiate
1903        let _ = server_io.read(&mut buf).await;
1904        let neg = NegotiateResponse { max_amq_calling: 1, max_amq_called: 1, pdu_length: pdu_size };
1905        let mut s7b = BytesMut::new();
1906        S7Header {
1907            pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
1908            param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
1909        }.encode(&mut s7b);
1910        neg.encode(&mut s7b);
1911        let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1912        let mut cb = BytesMut::new(); dt.encode(&mut cb);
1913        let mut tb = BytesMut::new();
1914        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1915        server_io.write_all(&tb).await.unwrap();
1916
1917        // One round-trip per batch
1918        for (i, item_count) in batches.iter().enumerate() {
1919            let _ = server_io.read(&mut buf).await;
1920            // WriteVar response: param = func(0x05) + count; data = return_code per item
1921            let mut s7b = BytesMut::new();
1922            S7Header {
1923                pdu_type: PduType::AckData, reserved: 0, pdu_ref: (i + 2) as u16,
1924                param_len: 2, data_len: *item_count as u16,
1925                error_class: Some(0), error_code: Some(0),
1926            }.encode(&mut s7b);
1927            s7b.extend_from_slice(&[0x05, *item_count as u8]); // func + count
1928            for _ in 0..*item_count {
1929                s7b.put_u8(0xFF); // success
1930            }
1931            let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
1932            let mut cb = BytesMut::new(); dt.encode(&mut cb);
1933            let mut tb = BytesMut::new();
1934            TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1935            server_io.write_all(&tb).await.unwrap();
1936        }
1937    }
1938
1939    #[tokio::test]
1940    async fn write_multi_vars_returns_ok() {
1941        let (client_io, server_io) = duplex(65536);
1942        let params = ConnectParams::default();
1943        tokio::spawn(mock_plc_multi_write(server_io, 480, vec![2]));
1944        let client = S7Client::from_transport(client_io, params).await.unwrap();
1945        let items = [
1946            MultiWriteItem::db(1, 0, vec![0xAA, 0xBB, 0xCC, 0xDD]),
1947            MultiWriteItem::db(2, 10, vec![0x01, 0x02]),
1948        ];
1949        client.write_multi_vars(&items).await.unwrap();
1950    }
1951
1952    #[tokio::test]
1953    async fn write_multi_vars_empty_returns_ok() {
1954        let (client_io, server_io) = duplex(4096);
1955        let params = ConnectParams::default();
1956        // No messages exchanged after handshake — the mock just needs to satisfy connect.
1957        tokio::spawn(mock_plc_multi_write(server_io, 480, vec![]));
1958        let client = S7Client::from_transport(client_io, params).await.unwrap();
1959        client.write_multi_vars(&[]).await.unwrap();
1960    }
1961
1962    /// Items split into two round-trips when PDU budget is exhausted.
1963    ///
1964    /// PDU = 64. max_payload = 64 - 10(hdr) - 2(overhead) = 52.
1965    /// Each item: 12(addr) + 4(data hdr) + 20(data) = 36.
1966    /// Two items = 72 > 52 → must split into two 1-item batches.
1967    #[tokio::test]
1968    async fn write_multi_vars_batches_when_pdu_limit_exceeded() {
1969        let (client_io, server_io) = duplex(65536);
1970        let params = ConnectParams::default();
1971        tokio::spawn(mock_plc_multi_write(server_io, 64, vec![1, 1]));
1972        let client = S7Client::from_transport(client_io, params).await.unwrap();
1973        let items = [
1974            MultiWriteItem::db(1, 0, vec![0x11u8; 20]),
1975            MultiWriteItem::db(2, 0, vec![0x22u8; 20]),
1976        ];
1977        client.write_multi_vars(&items).await.unwrap();
1978    }
1979
1980    /// Items are split into two round trips when response would exceed the negotiated PDU size.
1981    ///
1982    /// PDU = 64 bytes. max_resp_payload = 64 - 10(hdr) - 2(func+count) = 52 bytes.
1983    /// Each item with 30 bytes of data costs 4+30 = 34 bytes in the response.
1984    /// Two such items = 68 bytes → exceeds 52 → must split into 2 round trips.
1985    #[tokio::test]
1986    async fn read_multi_vars_batches_when_pdu_limit_exceeded() {
1987        use crate::proto::s7::negotiate::NegotiateResponse;
1988
1989        async fn mock_split_pdu(mut server_io: tokio::io::DuplexStream) {
1990            let mut buf = vec![0u8; 4096];
1991
1992            // COTP CR
1993            let _ = server_io.read(&mut buf).await;
1994            let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
1995            let mut cb = BytesMut::new(); cc.encode(&mut cb);
1996            let mut tb = BytesMut::new();
1997            TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
1998            server_io.write_all(&tb).await.unwrap();
1999
2000            // Negotiate — PDU size 64
2001            let _ = server_io.read(&mut buf).await;
2002            let neg = NegotiateResponse {
2003                max_amq_calling: 1, max_amq_called: 1, pdu_length: 64,
2004            };
2005            let mut s7b = BytesMut::new();
2006            S7Header {
2007                pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
2008                param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
2009            }.encode(&mut s7b);
2010            neg.encode(&mut s7b);
2011            let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
2012            let mut cb = BytesMut::new(); dt.encode(&mut cb);
2013            let mut tb = BytesMut::new();
2014            TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
2015            server_io.write_all(&tb).await.unwrap();
2016
2017            // Two separate round-trips, one item each
2018            let payloads: &[&[u8]] = &[&[0x11u8; 30], &[0x22u8; 30]];
2019            for (i, payload) in payloads.iter().enumerate() {
2020                let _ = server_io.read(&mut buf).await;
2021                let bit_len = (payload.len() * 8) as u16;
2022                let mut data_bytes = BytesMut::new();
2023                data_bytes.put_u8(0xFF);
2024                data_bytes.put_u8(0x04);
2025                data_bytes.put_u16(bit_len);
2026                data_bytes.extend_from_slice(payload);
2027                if payload.len() % 2 != 0 { data_bytes.put_u8(0x00); }
2028                let data_len = data_bytes.len() as u16;
2029                let mut s7b = BytesMut::new();
2030                S7Header {
2031                    pdu_type: PduType::AckData, reserved: 0, pdu_ref: (i + 2) as u16,
2032                    param_len: 2, data_len, error_class: Some(0), error_code: Some(0),
2033                }.encode(&mut s7b);
2034                s7b.extend_from_slice(&[0x04, 0x01]);
2035                s7b.extend_from_slice(&data_bytes);
2036                let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
2037                let mut cb = BytesMut::new(); dt.encode(&mut cb);
2038                let mut tb = BytesMut::new();
2039                TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
2040                server_io.write_all(&tb).await.unwrap();
2041            }
2042        }
2043
2044        let (client_io, server_io) = duplex(4096);
2045        let params = ConnectParams::default();
2046        tokio::spawn(mock_split_pdu(server_io));
2047        let client = S7Client::from_transport(client_io, params).await.unwrap();
2048
2049        let items = [MultiReadItem::db(1, 0, 30), MultiReadItem::db(2, 0, 30)];
2050        let results = client.read_multi_vars(&items).await.unwrap();
2051        assert_eq!(results.len(), 2);
2052        assert_eq!(&results[0][..], &[0x11u8; 30][..]);
2053        assert_eq!(&results[1][..], &[0x22u8; 30][..]);
2054    }
2055
2056    // -- PLC control & status mocks & tests -----------------------------------
2057
2058    /// Common handshake for control tests: COTP CR → CC, S7 Negotiate.
2059    async fn mock_handshake(server_io: &mut (impl AsyncRead + AsyncWrite + Unpin)) {
2060        let mut buf = vec![0u8; 4096];
2061
2062        // COTP CR
2063        let _ = server_io.read(&mut buf).await;
2064        let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
2065        let mut cb = BytesMut::new(); cc.encode(&mut cb);
2066        let mut tb = BytesMut::new();
2067        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
2068        server_io.write_all(&tb).await.unwrap();
2069
2070        // S7 Negotiate
2071        let _ = server_io.read(&mut buf).await;
2072        let neg = NegotiateResponse { max_amq_calling: 1, max_amq_called: 1, pdu_length: 480 };
2073        let mut s7b = BytesMut::new();
2074        S7Header {
2075            pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
2076            param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
2077        }.encode(&mut s7b);
2078        neg.encode(&mut s7b);
2079        let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
2080        let mut cb = BytesMut::new(); dt.encode(&mut cb);
2081        let mut tb = BytesMut::new();
2082        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
2083        server_io.write_all(&tb).await.unwrap();
2084    }
2085
2086    /// Mock for simple control commands (plc_stop / plc_hot_start / plc_cold_start).
2087    /// `ok` controls whether the mock sends success (error_class=0, error_code=0) or failure.
2088    async fn mock_plc_control(
2089        mut server_io: tokio::io::DuplexStream,
2090        ok: bool,
2091    ) {
2092        let mut buf = vec![0u8; 4096];
2093        mock_handshake(&mut server_io).await;
2094
2095        // Control request — consume
2096        let _ = server_io.read(&mut buf).await;
2097
2098        // AckData response
2099        let (ec, ecd) = if ok { (0u8, 0u8) } else { (0x81u8, 0x04u8) };
2100        let mut s7b = BytesMut::new();
2101        S7Header {
2102            pdu_type: PduType::AckData, reserved: 0, pdu_ref: 2,
2103            param_len: 0, data_len: 0,
2104            error_class: Some(ec), error_code: Some(ecd),
2105        }.encode(&mut s7b);
2106        let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
2107        let mut cb = BytesMut::new(); dt.encode(&mut cb);
2108        let mut tb = BytesMut::new();
2109        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
2110        server_io.write_all(&tb).await.unwrap();
2111    }
2112
2113    #[tokio::test]
2114    async fn plc_stop_succeeds() {
2115        let (client_io, server_io) = duplex(4096);
2116        let params = ConnectParams::default();
2117        tokio::spawn(mock_plc_control(server_io, true));
2118        let client = S7Client::from_transport(client_io, params).await.unwrap();
2119        client.plc_stop().await.unwrap();
2120    }
2121
2122    #[tokio::test]
2123    async fn plc_hot_start_succeeds() {
2124        let (client_io, server_io) = duplex(4096);
2125        let params = ConnectParams::default();
2126        tokio::spawn(mock_plc_control(server_io, true));
2127        let client = S7Client::from_transport(client_io, params).await.unwrap();
2128        client.plc_hot_start().await.unwrap();
2129    }
2130
2131    #[tokio::test]
2132    async fn plc_cold_start_succeeds() {
2133        let (client_io, server_io) = duplex(4096);
2134        let params = ConnectParams::default();
2135        tokio::spawn(mock_plc_control(server_io, true));
2136        let client = S7Client::from_transport(client_io, params).await.unwrap();
2137        client.plc_cold_start().await.unwrap();
2138    }
2139
2140    #[tokio::test]
2141    async fn plc_stop_rejected_returns_error() {
2142        let (client_io, server_io) = duplex(4096);
2143        let params = ConnectParams::default();
2144        tokio::spawn(mock_plc_control(server_io, false));
2145        let client = S7Client::from_transport(client_io, params).await.unwrap();
2146        let result = client.plc_stop().await;
2147        assert!(result.is_err());
2148    }
2149
2150    /// Mock for get_plc_status: sends back `status_byte` in the data section.
2151    async fn mock_plc_status(
2152        mut server_io: tokio::io::DuplexStream,
2153        status_byte: u8,
2154    ) {
2155        let mut buf = vec![0u8; 4096];
2156        mock_handshake(&mut server_io).await;
2157
2158        // GetPlcStatus request — consume
2159        let _ = server_io.read(&mut buf).await;
2160
2161        // Response: param echo [0x31, 0x00] + status byte
2162        let data = &[0x31u8, 0x00, status_byte]; // param(2) + data(1)
2163        let data_len = data.len() as u16;
2164        let mut s7b = BytesMut::new();
2165        S7Header {
2166            pdu_type: PduType::AckData, reserved: 0, pdu_ref: 2,
2167            param_len: 2, data_len,
2168            error_class: Some(0), error_code: Some(0),
2169        }.encode(&mut s7b);
2170        s7b.extend_from_slice(data);
2171        let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
2172        let mut cb = BytesMut::new(); dt.encode(&mut cb);
2173        let mut tb = BytesMut::new();
2174        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
2175        server_io.write_all(&tb).await.unwrap();
2176    }
2177
2178    #[tokio::test]
2179    async fn get_plc_status_returns_run() {
2180        let (client_io, server_io) = duplex(4096);
2181        let params = ConnectParams::default();
2182        tokio::spawn(mock_plc_status(server_io, 0x08));
2183        let client = S7Client::from_transport(client_io, params).await.unwrap();
2184        let status = client.get_plc_status().await.unwrap();
2185        assert_eq!(status, crate::types::PlcStatus::Run);
2186    }
2187
2188    #[tokio::test]
2189    async fn get_plc_status_returns_stop() {
2190        let (client_io, server_io) = duplex(4096);
2191        let params = ConnectParams::default();
2192        tokio::spawn(mock_plc_status(server_io, 0x04));
2193        let client = S7Client::from_transport(client_io, params).await.unwrap();
2194        let status = client.get_plc_status().await.unwrap();
2195        assert_eq!(status, crate::types::PlcStatus::Stop);
2196    }
2197
2198    #[tokio::test]
2199    async fn get_plc_status_returns_unknown() {
2200        let (client_io, server_io) = duplex(4096);
2201        let params = ConnectParams::default();
2202        tokio::spawn(mock_plc_status(server_io, 0x00));
2203        let client = S7Client::from_transport(client_io, params).await.unwrap();
2204        let status = client.get_plc_status().await.unwrap();
2205        assert_eq!(status, crate::types::PlcStatus::Unknown);
2206    }
2207
2208    #[tokio::test]
2209    async fn get_plc_status_unknown_byte_returns_error() {
2210        let (client_io, server_io) = duplex(4096);
2211        let params = ConnectParams::default();
2212        tokio::spawn(mock_plc_status(server_io, 0xFF));
2213        let client = S7Client::from_transport(client_io, params).await.unwrap();
2214        let result = client.get_plc_status().await;
2215        assert!(result.is_err());
2216    }
2217}