Skip to main content

snap7_client/
client.rs

1use bytes::{Buf, Bytes, BytesMut};
2use std::net::SocketAddr;
3use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4use tokio::sync::Mutex;
5
6use crate::proto::{
7    cotp::CotpPdu,
8    s7::{
9        clock::PlcDateTime,
10        header::{Area, PduType, S7Header, TransportSize},
11        read_var::{AddressItem, ReadVarRequest, ReadVarResponse},
12        szl::{SzlRequest, SzlResponse},
13        write_var::{WriteItem, WriteVarRequest, WriteVarResponse},
14    },
15    tpkt::TpktFrame,
16};
17
18use crate::{
19    connection::{connect, Connection},
20    error::{Error, Result},
21    types::ConnectParams,
22};
23
24/// A single item in a `read_multi_vars` request.
25#[derive(Debug, Clone)]
26pub struct MultiReadItem {
27    pub area: Area,
28    pub db_number: u16,
29    pub start: u32,
30    pub length: u16,
31    pub transport: TransportSize,
32}
33
34impl MultiReadItem {
35    /// Convenience constructor for a DataBlock byte read.
36    pub fn db(db: u16, start: u32, length: u16) -> Self {
37        Self {
38            area: Area::DataBlock,
39            db_number: db,
40            start,
41            length,
42            transport: TransportSize::Byte,
43        }
44    }
45}
46
47/// A single item in a `write_multi_vars` request.
48#[derive(Debug, Clone)]
49pub struct MultiWriteItem {
50    pub area: Area,
51    pub db_number: u16,
52    pub start: u32,
53    pub data: Bytes,
54}
55
56impl MultiWriteItem {
57    /// Convenience constructor for a DataBlock byte write.
58    pub fn db(db: u16, start: u32, data: impl Into<Bytes>) -> Self {
59        Self {
60            area: Area::DataBlock,
61            db_number: db,
62            start,
63            data: data.into(),
64        }
65    }
66}
67
68struct Inner<T> {
69    transport: T,
70    connection: Connection,
71    pdu_ref: u16,
72}
73
74pub struct S7Client<T: AsyncRead + AsyncWrite + Unpin + Send> {
75    inner: Mutex<Inner<T>>,
76    #[allow(dead_code)]
77    params: ConnectParams,
78}
79
80impl<T: AsyncRead + AsyncWrite + Unpin + Send> S7Client<T> {
81    pub async fn from_transport(transport: T, params: ConnectParams) -> Result<Self> {
82        let mut t = transport;
83        let connection = connect(&mut t, &params).await?;
84        Ok(S7Client {
85            inner: Mutex::new(Inner {
86                transport: t,
87                connection,
88                pdu_ref: 1,
89            }),
90            params,
91        })
92    }
93
94    fn next_pdu_ref(inner: &mut Inner<T>) -> u16 {
95        inner.pdu_ref = inner.pdu_ref.wrapping_add(1);
96        inner.pdu_ref
97    }
98
99    async fn send_s7(
100        inner: &mut Inner<T>,
101        param_buf: Bytes,
102        data_buf: Bytes,
103        pdu_ref: u16,
104        pdu_type: PduType,
105    ) -> Result<()> {
106        let header = S7Header {
107            pdu_type,
108            reserved: 0,
109            pdu_ref,
110            param_len: param_buf.len() as u16,
111            data_len: data_buf.len() as u16,
112            error_class: None,
113            error_code: None,
114        };
115        let mut s7b = BytesMut::new();
116        header.encode(&mut s7b);
117        s7b.extend_from_slice(&param_buf);
118        s7b.extend_from_slice(&data_buf);
119
120        let dt = CotpPdu::Data {
121            tpdu_nr: 0,
122            last: true,
123            payload: s7b.freeze(),
124        };
125        let mut cotpb = BytesMut::new();
126        dt.encode(&mut cotpb);
127        let tpkt = TpktFrame {
128            payload: cotpb.freeze(),
129        };
130        let mut tb = BytesMut::new();
131        tpkt.encode(&mut tb)?;
132        inner.transport.write_all(&tb).await?;
133        Ok(())
134    }
135
136    async fn recv_s7(inner: &mut Inner<T>) -> Result<(S7Header, Bytes)> {
137        let mut tpkt_hdr = [0u8; 4];
138        inner.transport.read_exact(&mut tpkt_hdr).await?;
139        let total = u16::from_be_bytes([tpkt_hdr[2], tpkt_hdr[3]]) as usize;
140        if total < 4 {
141            return Err(Error::UnexpectedResponse);
142        }
143        let mut payload = vec![0u8; total - 4];
144        inner.transport.read_exact(&mut payload).await?;
145        let mut b = Bytes::from(payload);
146
147        // COTP DT header: LI (1) + code (1) + tpdu_nr (1)
148        if b.remaining() < 3 {
149            return Err(Error::UnexpectedResponse);
150        }
151        let _li = b.get_u8();
152        let cotp_code = b.get_u8();
153        if cotp_code != 0xF0 {
154            return Err(Error::UnexpectedResponse);
155        }
156        b.advance(1); // tpdu_nr byte
157
158        let header = S7Header::decode(&mut b)?;
159        Ok((header, b))
160    }
161
162    pub async fn db_read(&self, db: u16, start: u32, length: u16) -> Result<Bytes> {
163        let mut inner = self.inner.lock().await;
164        let pdu_ref = Self::next_pdu_ref(&mut inner);
165
166        let req = ReadVarRequest {
167            items: vec![AddressItem {
168                area: Area::DataBlock,
169                db_number: db,
170                start,
171                bit_offset: 0,
172                length,
173                transport: TransportSize::Byte,
174            }],
175        };
176        let mut param_buf = BytesMut::new();
177        req.encode(&mut param_buf);
178
179        Self::send_s7(
180            &mut inner,
181            param_buf.freeze(),
182            Bytes::new(),
183            pdu_ref,
184            PduType::Job,
185        )
186        .await?;
187
188        let (header, mut body) = Self::recv_s7(&mut inner).await?;
189        check_plc_error(&header, "db_read")?;
190        if body.remaining() >= 2 {
191            body.advance(2); // skip param echo: func + item count
192        }
193        let resp = ReadVarResponse::decode(&mut body, 1)?;
194        if resp.items.is_empty() {
195            return Err(Error::UnexpectedResponse);
196        }
197        if resp.items[0].return_code != 0xFF {
198            return Err(Error::PlcError {
199                code: resp.items[0].return_code as u32,
200                message: "item error".into(),
201            });
202        }
203        Ok(resp.items[0].data.clone())
204    }
205
206    /// Read multiple PLC regions in one or more S7 PDU exchanges.
207    ///
208    /// Automatically batches items when the item count would exceed the Siemens hard
209    /// limit of 20 per PDU, or when the encoded request or response would exceed the
210    /// negotiated PDU size. Returns one `Bytes` per item in input order.
211    ///
212    /// Unlike `db_read`, this accepts any `Area` and `TransportSize`.
213    pub async fn read_multi_vars(&self, items: &[MultiReadItem]) -> Result<Vec<Bytes>> {
214        if items.is_empty() {
215            return Ok(Vec::new());
216        }
217
218        // PDU size constants (in bytes)
219        // S7 header: 10, func+count: 2, per-item address: 12
220        const S7_HEADER: usize = 10;
221        const PARAM_OVERHEAD: usize = 2; // func + item count
222        const ADDR_ITEM_SIZE: usize = 12;
223        // Response data item: 4 header + data + 0/1 pad
224        const DATA_ITEM_OVERHEAD: usize = 4;
225        const MAX_ITEMS_PER_PDU: usize = 20;
226
227        let mut inner = self.inner.lock().await;
228        let pdu_size = inner.connection.pdu_size as usize;
229        let max_req_payload = pdu_size.saturating_sub(S7_HEADER + PARAM_OVERHEAD);
230        let max_resp_payload = pdu_size.saturating_sub(S7_HEADER + PARAM_OVERHEAD);
231
232        let mut results = vec![Bytes::new(); items.len()];
233        let mut batch_start = 0;
234
235        while batch_start < items.len() {
236            // Build a batch that fits within PDU limits
237            let mut batch_end = batch_start;
238            let mut req_bytes_used = 0usize;
239            let mut resp_bytes_used = 0usize;
240
241            while batch_end < items.len() && (batch_end - batch_start) < MAX_ITEMS_PER_PDU {
242                let item = &items[batch_end];
243                let item_resp_size =
244                    DATA_ITEM_OVERHEAD + item.length as usize + (item.length as usize % 2);
245
246                if batch_end > batch_start
247                    && (req_bytes_used + ADDR_ITEM_SIZE > max_req_payload
248                        || resp_bytes_used + item_resp_size > max_resp_payload)
249                {
250                    break;
251                }
252                req_bytes_used += ADDR_ITEM_SIZE;
253                resp_bytes_used += item_resp_size;
254                batch_end += 1;
255            }
256
257            let batch = &items[batch_start..batch_end];
258            let pdu_ref = Self::next_pdu_ref(&mut inner);
259
260            let req = ReadVarRequest {
261                items: batch
262                    .iter()
263                    .map(|item| AddressItem {
264                        area: item.area,
265                        db_number: item.db_number,
266                        start: item.start,
267                        bit_offset: 0,
268                        // Siemens requires Byte transport + byte-count length in the request.
269                        // The item's declared transport is only used to decode the response.
270                        length: item.length,
271                        transport: TransportSize::Byte,
272                    })
273                    .collect(),
274            };
275            let mut param_buf = BytesMut::new();
276            req.encode(&mut param_buf);
277
278            Self::send_s7(
279                &mut inner,
280                param_buf.freeze(),
281                Bytes::new(),
282                pdu_ref,
283                PduType::Job,
284            )
285            .await?;
286
287            let (header, mut body) = Self::recv_s7(&mut inner).await?;
288            check_plc_error(&header, "read_multi_vars")?;
289            if body.remaining() >= 2 {
290                body.advance(2); // skip func + item_count echo
291            }
292            let resp = ReadVarResponse::decode(&mut body, batch.len())?;
293
294            for (i, item) in resp.items.into_iter().enumerate() {
295                if item.return_code != 0xFF {
296                    return Err(Error::PlcError {
297                        code: item.return_code as u32,
298                        message: format!("item {} error", batch_start + i),
299                    });
300                }
301                results[batch_start + i] = item.data;
302            }
303
304            batch_start = batch_end;
305        }
306
307        Ok(results)
308    }
309
310    /// Write multiple PLC regions in one or more S7 PDU exchanges.
311    ///
312    /// Automatically batches items when the count or encoded size would exceed the
313    /// negotiated PDU size or the Siemens hard limit of 20 items per PDU.
314    /// Returns `Ok(())` only when all items are acknowledged with return code 0xFF.
315    pub async fn write_multi_vars(&self, items: &[MultiWriteItem]) -> Result<()> {
316        if items.is_empty() {
317            return Ok(());
318        }
319
320        const S7_HEADER: usize = 10;
321        const PARAM_OVERHEAD: usize = 2; // func + item count
322        const ADDR_ITEM_SIZE: usize = 12;
323        const DATA_ITEM_OVERHEAD: usize = 4; // reserved + transport + bit_len (2)
324        const MAX_ITEMS_PER_PDU: usize = 20;
325
326        let mut inner = self.inner.lock().await;
327        let pdu_size = inner.connection.pdu_size as usize;
328        let max_payload = pdu_size.saturating_sub(S7_HEADER + PARAM_OVERHEAD);
329
330        let mut batch_start = 0;
331
332        while batch_start < items.len() {
333            let mut batch_end = batch_start;
334            let mut bytes_used = 0usize;
335
336            while batch_end < items.len() && (batch_end - batch_start) < MAX_ITEMS_PER_PDU {
337                let item = &items[batch_end];
338                let data_len = item.data.len();
339                let item_size = ADDR_ITEM_SIZE + DATA_ITEM_OVERHEAD + data_len + (data_len % 2);
340
341                if batch_end > batch_start && bytes_used + item_size > max_payload {
342                    break;
343                }
344                bytes_used += item_size;
345                batch_end += 1;
346            }
347
348            let batch = &items[batch_start..batch_end];
349            let pdu_ref = Self::next_pdu_ref(&mut inner);
350
351            let req = WriteVarRequest {
352                items: batch
353                    .iter()
354                    .map(|item| WriteItem {
355                        address: AddressItem {
356                            area: item.area,
357                            db_number: item.db_number,
358                            start: item.start,
359                            bit_offset: 0,
360                            length: item.data.len() as u16,
361                            transport: TransportSize::Byte,
362                        },
363                        data: item.data.clone(),
364                    })
365                    .collect(),
366            };
367            let mut param_buf = BytesMut::new();
368            req.encode(&mut param_buf);
369
370            Self::send_s7(
371                &mut inner,
372                param_buf.freeze(),
373                Bytes::new(),
374                pdu_ref,
375                PduType::Job,
376            )
377            .await?;
378
379            let (header, mut body) = Self::recv_s7(&mut inner).await?;
380            check_plc_error(&header, "write_multi_vars")?;
381            if body.remaining() >= 2 {
382                body.advance(2); // skip func + item_count echo
383            }
384            let resp = WriteVarResponse::decode(&mut body, batch.len())?;
385            for (i, &code) in resp.return_codes.iter().enumerate() {
386                if code != 0xFF {
387                    return Err(Error::PlcError {
388                        code: code as u32,
389                        message: format!("item {} write error", batch_start + i),
390                    });
391                }
392            }
393
394            batch_start = batch_end;
395        }
396
397        Ok(())
398    }
399
400    pub async fn db_write(&self, db: u16, start: u32, data: &[u8]) -> Result<()> {
401        let mut inner = self.inner.lock().await;
402        let pdu_ref = Self::next_pdu_ref(&mut inner);
403
404        let req = WriteVarRequest {
405            items: vec![WriteItem {
406                address: AddressItem {
407                    area: Area::DataBlock,
408                    db_number: db,
409                    start,
410                    bit_offset: 0,
411                    length: data.len() as u16,
412                    transport: TransportSize::Byte,
413                },
414                data: Bytes::copy_from_slice(data),
415            }],
416        };
417        let mut param_buf = BytesMut::new();
418        req.encode(&mut param_buf);
419
420        Self::send_s7(
421            &mut inner,
422            param_buf.freeze(),
423            Bytes::new(),
424            pdu_ref,
425            PduType::Job,
426        )
427        .await?;
428
429        let (header, mut body) = Self::recv_s7(&mut inner).await?;
430        check_plc_error(&header, "db_write")?;
431        if body.has_remaining() {
432            body.advance(2); // skip func + item count
433        }
434        let resp = WriteVarResponse::decode(&mut body, 1)?;
435        if resp.return_codes[0] != 0xFF {
436            return Err(Error::PlcError {
437                code: resp.return_codes[0] as u32,
438                message: "write error".into(),
439            });
440        }
441        Ok(())
442    }
443
444    pub async fn read_szl(&self, szl_id: u16, szl_index: u16) -> Result<SzlResponse> {
445        let mut inner = self.inner.lock().await;
446        let pdu_ref = Self::next_pdu_ref(&mut inner);
447
448        let req = SzlRequest { szl_id, szl_index };
449        let mut param_buf = BytesMut::new();
450        req.encode(&mut param_buf);
451
452        Self::send_s7(
453            &mut inner,
454            param_buf.freeze(),
455            Bytes::new(),
456            pdu_ref,
457            PduType::UserData,
458        )
459        .await?;
460
461        let (_header, mut body) = Self::recv_s7(&mut inner).await?;
462        if body.remaining() > 12 {
463            body.advance(body.remaining() - 12);
464        }
465        Ok(SzlResponse::decode(&mut body)?)
466    }
467
468    pub async fn read_clock(&self) -> Result<PlcDateTime> {
469        let mut inner = self.inner.lock().await;
470        let pdu_ref = Self::next_pdu_ref(&mut inner);
471        let mut param_buf = BytesMut::new();
472        param_buf.extend_from_slice(&[0x00, 0x01, 0x12, 0x04, 0xF5, 0x00]);
473        Self::send_s7(
474            &mut inner,
475            param_buf.freeze(),
476            Bytes::new(),
477            pdu_ref,
478            PduType::UserData,
479        )
480        .await?;
481        let (_header, mut body) = Self::recv_s7(&mut inner).await?;
482        if body.remaining() > 8 {
483            body.advance(body.remaining() - 8);
484        }
485        Ok(PlcDateTime::decode(&mut body)?)
486    }
487}
488
489fn check_plc_error(header: &S7Header, context: &str) -> Result<()> {
490    if let (Some(ec), Some(ecd)) = (header.error_class, header.error_code) {
491        if ec != 0 || ecd != 0 {
492            return Err(Error::PlcError {
493                code: ((ec as u32) << 8) | ecd as u32,
494                message: format!("{} error", context),
495            });
496        }
497    }
498    Ok(())
499}
500
501impl S7Client<crate::transport::TcpTransport> {
502    pub async fn connect(addr: SocketAddr, params: ConnectParams) -> Result<Self> {
503        let transport =
504            crate::transport::TcpTransport::connect(addr, params.connect_timeout).await?;
505        Self::from_transport(transport, params).await
506    }
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512    use bytes::BufMut;
513    use crate::proto::{
514        cotp::CotpPdu,
515        s7::{
516            header::{PduType, S7Header},
517            negotiate::NegotiateResponse,
518        },
519        tpkt::TpktFrame,
520    };
521    use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
522
523    async fn mock_plc_db_read(mut server_io: tokio::io::DuplexStream, response_data: Vec<u8>) {
524        let mut buf = vec![0u8; 4096];
525
526        // respond to COTP CR
527        let _ = server_io.read(&mut buf).await;
528        let cc = CotpPdu::ConnectConfirm {
529            dst_ref: 1,
530            src_ref: 1,
531        };
532        let mut cb = BytesMut::new();
533        cc.encode(&mut cb);
534        let mut tb = BytesMut::new();
535        TpktFrame {
536            payload: cb.freeze(),
537        }
538        .encode(&mut tb)
539        .unwrap();
540        server_io.write_all(&tb).await.unwrap();
541
542        // respond to S7 negotiate
543        let _ = server_io.read(&mut buf).await;
544        let neg = NegotiateResponse {
545            max_amq_calling: 1,
546            max_amq_called: 1,
547            pdu_length: 480,
548        };
549        let mut s7b = BytesMut::new();
550        S7Header {
551            pdu_type: PduType::AckData,
552            reserved: 0,
553            pdu_ref: 1,
554            param_len: 8,
555            data_len: 0,
556            error_class: Some(0),
557            error_code: Some(0),
558        }
559        .encode(&mut s7b);
560        neg.encode(&mut s7b);
561        let dt = CotpPdu::Data {
562            tpdu_nr: 0,
563            last: true,
564            payload: s7b.freeze(),
565        };
566        let mut cb = BytesMut::new();
567        dt.encode(&mut cb);
568        let mut tb = BytesMut::new();
569        TpktFrame {
570            payload: cb.freeze(),
571        }
572        .encode(&mut tb)
573        .unwrap();
574        server_io.write_all(&tb).await.unwrap();
575
576        // respond to db_read
577        let _ = server_io.read(&mut buf).await;
578        let mut s7b = BytesMut::new();
579        S7Header {
580            pdu_type: PduType::AckData,
581            reserved: 0,
582            pdu_ref: 2,
583            param_len: 2,
584            data_len: (4 + response_data.len()) as u16,
585            error_class: Some(0),
586            error_code: Some(0),
587        }
588        .encode(&mut s7b);
589        s7b.extend_from_slice(&[0x04, 0x01]); // ReadVar func + 1 item
590        s7b.put_u8(0xFF); // return_code = success
591        s7b.put_u8(0x04); // transport = word
592        s7b.put_u16((response_data.len() * 8) as u16);
593        s7b.extend_from_slice(&response_data);
594        let dt = CotpPdu::Data {
595            tpdu_nr: 0,
596            last: true,
597            payload: s7b.freeze(),
598        };
599        let mut cb = BytesMut::new();
600        dt.encode(&mut cb);
601        let mut tb = BytesMut::new();
602        TpktFrame {
603            payload: cb.freeze(),
604        }
605        .encode(&mut tb)
606        .unwrap();
607        server_io.write_all(&tb).await.unwrap();
608    }
609
610    #[tokio::test]
611    async fn db_read_returns_data() {
612        let (client_io, server_io) = duplex(4096);
613        let params = ConnectParams::default();
614        let expected = vec![0xDE, 0xAD, 0xBE, 0xEF];
615        tokio::spawn(mock_plc_db_read(server_io, expected.clone()));
616        let client = S7Client::from_transport(client_io, params).await.unwrap();
617        let data = client.db_read(1, 0, 4).await.unwrap();
618        assert_eq!(&data[..], &expected[..]);
619    }
620
621    /// Mock that handles COTP+Negotiate handshake then serves one multi-read response.
622    async fn mock_plc_multi_read(
623        mut server_io: tokio::io::DuplexStream,
624        items: Vec<Vec<u8>>, // one byte vec per item
625    ) {
626        let mut buf = vec![0u8; 4096];
627
628        // COTP CR
629        let _ = server_io.read(&mut buf).await;
630        let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
631        let mut cb = BytesMut::new();
632        cc.encode(&mut cb);
633        let mut tb = BytesMut::new();
634        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
635        server_io.write_all(&tb).await.unwrap();
636
637        // S7 Negotiate
638        let _ = server_io.read(&mut buf).await;
639        let neg = NegotiateResponse { max_amq_calling: 1, max_amq_called: 1, pdu_length: 480 };
640        let mut s7b = BytesMut::new();
641        S7Header {
642            pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
643            param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
644        }.encode(&mut s7b);
645        neg.encode(&mut s7b);
646        let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
647        let mut cb = BytesMut::new(); dt.encode(&mut cb);
648        let mut tb = BytesMut::new();
649        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
650        server_io.write_all(&tb).await.unwrap();
651
652        // ReadMultiVar request
653        let _ = server_io.read(&mut buf).await;
654
655        // Build response data: one DataItem per input item
656        let item_count = items.len() as u8;
657        let mut data_bytes = BytesMut::new();
658        for item_data in &items {
659            data_bytes.put_u8(0xFF); // return_code OK
660            data_bytes.put_u8(0x04); // transport byte
661            data_bytes.put_u16((item_data.len() * 8) as u16);
662            data_bytes.extend_from_slice(item_data);
663            if item_data.len() % 2 != 0 {
664                data_bytes.put_u8(0x00); // pad
665            }
666        }
667        let data_len = data_bytes.len() as u16;
668        let mut s7b = BytesMut::new();
669        S7Header {
670            pdu_type: PduType::AckData, reserved: 0, pdu_ref: 2,
671            param_len: 2, data_len, error_class: Some(0), error_code: Some(0),
672        }.encode(&mut s7b);
673        s7b.extend_from_slice(&[0x04, item_count]); // func + item_count
674        s7b.extend_from_slice(&data_bytes);
675
676        let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
677        let mut cb = BytesMut::new(); dt.encode(&mut cb);
678        let mut tb = BytesMut::new();
679        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
680        server_io.write_all(&tb).await.unwrap();
681    }
682
683    #[tokio::test]
684    async fn read_multi_vars_returns_all_items() {
685        let (client_io, server_io) = duplex(4096);
686        let params = ConnectParams::default();
687        let item1 = vec![0xDE, 0xAD, 0xBE, 0xEF];
688        let item2 = vec![0x01, 0x02];
689        tokio::spawn(mock_plc_multi_read(server_io, vec![item1.clone(), item2.clone()]));
690        let client = S7Client::from_transport(client_io, params).await.unwrap();
691        let items = [MultiReadItem::db(1, 0, 4), MultiReadItem::db(2, 10, 2)];
692        let results = client.read_multi_vars(&items).await.unwrap();
693        assert_eq!(results.len(), 2);
694        assert_eq!(&results[0][..], &item1[..]);
695        assert_eq!(&results[1][..], &item2[..]);
696    }
697
698    #[tokio::test]
699    async fn read_multi_vars_empty_returns_empty() {
700        let (client_io, server_io) = duplex(4096);
701        let params = ConnectParams::default();
702        tokio::spawn(mock_plc_multi_read(server_io, vec![]));
703        let client = S7Client::from_transport(client_io, params).await.unwrap();
704        let results = client.read_multi_vars(&[]).await.unwrap();
705        assert!(results.is_empty());
706    }
707
708    /// Mock that handles COTP+Negotiate then serves N write-response round-trips.
709    /// `batches` is a list of item counts per round-trip; the mock sends 0xFF for each.
710    async fn mock_plc_multi_write(
711        mut server_io: tokio::io::DuplexStream,
712        pdu_size: u16,
713        batches: Vec<usize>,
714    ) {
715        let mut buf = vec![0u8; 65536];
716
717        // COTP CR
718        let _ = server_io.read(&mut buf).await;
719        let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
720        let mut cb = BytesMut::new(); cc.encode(&mut cb);
721        let mut tb = BytesMut::new();
722        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
723        server_io.write_all(&tb).await.unwrap();
724
725        // S7 Negotiate
726        let _ = server_io.read(&mut buf).await;
727        let neg = NegotiateResponse { max_amq_calling: 1, max_amq_called: 1, pdu_length: pdu_size };
728        let mut s7b = BytesMut::new();
729        S7Header {
730            pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
731            param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
732        }.encode(&mut s7b);
733        neg.encode(&mut s7b);
734        let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
735        let mut cb = BytesMut::new(); dt.encode(&mut cb);
736        let mut tb = BytesMut::new();
737        TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
738        server_io.write_all(&tb).await.unwrap();
739
740        // One round-trip per batch
741        for (i, item_count) in batches.iter().enumerate() {
742            let _ = server_io.read(&mut buf).await;
743            // WriteVar response: param = func(0x05) + count; data = return_code per item
744            let mut s7b = BytesMut::new();
745            S7Header {
746                pdu_type: PduType::AckData, reserved: 0, pdu_ref: (i + 2) as u16,
747                param_len: 2, data_len: *item_count as u16,
748                error_class: Some(0), error_code: Some(0),
749            }.encode(&mut s7b);
750            s7b.extend_from_slice(&[0x05, *item_count as u8]); // func + count
751            for _ in 0..*item_count {
752                s7b.put_u8(0xFF); // success
753            }
754            let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
755            let mut cb = BytesMut::new(); dt.encode(&mut cb);
756            let mut tb = BytesMut::new();
757            TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
758            server_io.write_all(&tb).await.unwrap();
759        }
760    }
761
762    #[tokio::test]
763    async fn write_multi_vars_returns_ok() {
764        let (client_io, server_io) = duplex(65536);
765        let params = ConnectParams::default();
766        tokio::spawn(mock_plc_multi_write(server_io, 480, vec![2]));
767        let client = S7Client::from_transport(client_io, params).await.unwrap();
768        let items = [
769            MultiWriteItem::db(1, 0, vec![0xAA, 0xBB, 0xCC, 0xDD]),
770            MultiWriteItem::db(2, 10, vec![0x01, 0x02]),
771        ];
772        client.write_multi_vars(&items).await.unwrap();
773    }
774
775    #[tokio::test]
776    async fn write_multi_vars_empty_returns_ok() {
777        let (client_io, server_io) = duplex(4096);
778        let params = ConnectParams::default();
779        // No messages exchanged after handshake — the mock just needs to satisfy connect.
780        tokio::spawn(mock_plc_multi_write(server_io, 480, vec![]));
781        let client = S7Client::from_transport(client_io, params).await.unwrap();
782        client.write_multi_vars(&[]).await.unwrap();
783    }
784
785    /// Items split into two round-trips when PDU budget is exhausted.
786    ///
787    /// PDU = 64. max_payload = 64 - 10(hdr) - 2(overhead) = 52.
788    /// Each item: 12(addr) + 4(data hdr) + 20(data) = 36.
789    /// Two items = 72 > 52 → must split into two 1-item batches.
790    #[tokio::test]
791    async fn write_multi_vars_batches_when_pdu_limit_exceeded() {
792        let (client_io, server_io) = duplex(65536);
793        let params = ConnectParams::default();
794        tokio::spawn(mock_plc_multi_write(server_io, 64, vec![1, 1]));
795        let client = S7Client::from_transport(client_io, params).await.unwrap();
796        let items = [
797            MultiWriteItem::db(1, 0, vec![0x11u8; 20]),
798            MultiWriteItem::db(2, 0, vec![0x22u8; 20]),
799        ];
800        client.write_multi_vars(&items).await.unwrap();
801    }
802
803    /// Items are split into two round trips when response would exceed the negotiated PDU size.
804    ///
805    /// PDU = 64 bytes. max_resp_payload = 64 - 10(hdr) - 2(func+count) = 52 bytes.
806    /// Each item with 30 bytes of data costs 4+30 = 34 bytes in the response.
807    /// Two such items = 68 bytes → exceeds 52 → must split into 2 round trips.
808    #[tokio::test]
809    async fn read_multi_vars_batches_when_pdu_limit_exceeded() {
810        use crate::proto::s7::negotiate::NegotiateResponse;
811
812        async fn mock_split_pdu(mut server_io: tokio::io::DuplexStream) {
813            let mut buf = vec![0u8; 4096];
814
815            // COTP CR
816            let _ = server_io.read(&mut buf).await;
817            let cc = CotpPdu::ConnectConfirm { dst_ref: 1, src_ref: 1 };
818            let mut cb = BytesMut::new(); cc.encode(&mut cb);
819            let mut tb = BytesMut::new();
820            TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
821            server_io.write_all(&tb).await.unwrap();
822
823            // Negotiate — PDU size 64
824            let _ = server_io.read(&mut buf).await;
825            let neg = NegotiateResponse {
826                max_amq_calling: 1, max_amq_called: 1, pdu_length: 64,
827            };
828            let mut s7b = BytesMut::new();
829            S7Header {
830                pdu_type: PduType::AckData, reserved: 0, pdu_ref: 1,
831                param_len: 8, data_len: 0, error_class: Some(0), error_code: Some(0),
832            }.encode(&mut s7b);
833            neg.encode(&mut s7b);
834            let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
835            let mut cb = BytesMut::new(); dt.encode(&mut cb);
836            let mut tb = BytesMut::new();
837            TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
838            server_io.write_all(&tb).await.unwrap();
839
840            // Two separate round-trips, one item each
841            let payloads: &[&[u8]] = &[&[0x11u8; 30], &[0x22u8; 30]];
842            for (i, payload) in payloads.iter().enumerate() {
843                let _ = server_io.read(&mut buf).await;
844                let bit_len = (payload.len() * 8) as u16;
845                let mut data_bytes = BytesMut::new();
846                data_bytes.put_u8(0xFF);
847                data_bytes.put_u8(0x04);
848                data_bytes.put_u16(bit_len);
849                data_bytes.extend_from_slice(payload);
850                if payload.len() % 2 != 0 { data_bytes.put_u8(0x00); }
851                let data_len = data_bytes.len() as u16;
852                let mut s7b = BytesMut::new();
853                S7Header {
854                    pdu_type: PduType::AckData, reserved: 0, pdu_ref: (i + 2) as u16,
855                    param_len: 2, data_len, error_class: Some(0), error_code: Some(0),
856                }.encode(&mut s7b);
857                s7b.extend_from_slice(&[0x04, 0x01]);
858                s7b.extend_from_slice(&data_bytes);
859                let dt = CotpPdu::Data { tpdu_nr: 0, last: true, payload: s7b.freeze() };
860                let mut cb = BytesMut::new(); dt.encode(&mut cb);
861                let mut tb = BytesMut::new();
862                TpktFrame { payload: cb.freeze() }.encode(&mut tb).unwrap();
863                server_io.write_all(&tb).await.unwrap();
864            }
865        }
866
867        let (client_io, server_io) = duplex(4096);
868        let params = ConnectParams::default();
869        tokio::spawn(mock_split_pdu(server_io));
870        let client = S7Client::from_transport(client_io, params).await.unwrap();
871
872        let items = [MultiReadItem::db(1, 0, 30), MultiReadItem::db(2, 0, 30)];
873        let results = client.read_multi_vars(&items).await.unwrap();
874        assert_eq!(results.len(), 2);
875        assert_eq!(&results[0][..], &[0x11u8; 30][..]);
876        assert_eq!(&results[1][..], &[0x22u8; 30][..]);
877    }
878}