Skip to main content

snap7_client/
plus_client.rs

1use bytes::{Bytes, BytesMut};
2use std::net::SocketAddr;
3use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4use tokio::sync::Mutex;
5
6use crate::proto::s7commplus::frame::{S7PlusFrame, Version};
7use crate::proto::s7commplus::multivar::{
8    GetMultiVarRequest, GetMultiVarResponse, SetMultiVarRequest, SetVarItem, VarSpec,
9};
10use crate::proto::s7commplus::session::{FC_DELETE_OBJECT, OPCODE_REQUEST};
11use crate::proto::s7commplus::data::DataArea;
12use crate::proto::tpkt::TpktFrame;
13
14use crate::error::Error;
15use crate::plus_connection::{plus_connect, PlusConnection};
16use crate::tls::{tls_connect, TlsStream};
17use crate::transport::TcpTransport;
18
19/// Inner mutable state for an `S7PlusClient`.
20pub(crate) struct PlusInner<T> {
21    pub transport: T,
22    pub conn: PlusConnection,
23}
24
25/// A client for S7CommPlus (S7-1200/1500 "integrity mode") communication.
26pub struct S7PlusClient<T: AsyncRead + AsyncWrite + Unpin + Send> {
27    pub(crate) inner: Mutex<PlusInner<T>>,
28}
29
30fn db_lid(db: u16, byte_offset: u32) -> (u32, u32) {
31    let crc = 0x48u32;
32    let lid = 0x8400_0000u32 | ((db as u32) << 12) | (byte_offset & 0xFFF);
33    (crc, lid)
34}
35
36/// Specification for a single DB variable to be read or written in batch.
37#[derive(Debug, Clone)]
38pub struct DbVarSpec {
39    pub db: u16,
40    pub offset: u32,
41    pub length: u16,
42}
43
44// ---------------------------------------------------------------------------
45// Generic transport methods
46// ---------------------------------------------------------------------------
47
48impl<T: AsyncRead + AsyncWrite + Unpin + Send> S7PlusClient<T> {
49    /// Read `length` bytes from DB `db` at byte offset `start`.
50    pub async fn db_read(&self, db: u16, start: u32, length: u16) -> Result<Bytes, Error> {
51        let r = self.read_multi_vars(&[DbVarSpec { db, offset: start, length }]).await?;
52        Ok(r.into_iter().next().unwrap_or_default())
53    }
54
55    /// Write `data` to DB `db` at byte offset `start`.
56    pub async fn db_write(&self, db: u16, start: u32, data: &[u8]) -> Result<(), Error> {
57        self.write_multi_vars(&[DbVarSpec { db, offset: start, length: data.len() as u16 }], &[Bytes::copy_from_slice(data)]).await
58    }
59
60    /// Read multiple DB variables in a single S7CommPlus PDU.
61    ///
62    /// Each `DbVarSpec` specifies the DB number, byte offset, and read length.
63    /// Returns one `Bytes` per input spec in the same order.
64    pub async fn read_multi_vars(&self, specs: &[DbVarSpec]) -> Result<Vec<Bytes>, Error> {
65        if specs.is_empty() {
66            return Ok(Vec::new());
67        }
68        let mut inner = self.inner.lock().await;
69        let seqnum = inner.conn.seqnum;
70        inner.conn.seqnum = seqnum.wrapping_add(1);
71        let items: Vec<VarSpec> = specs
72            .iter()
73            .map(|s| {
74                let (crc, lid) = db_lid(s.db, s.offset);
75                VarSpec { crc, lid }
76            })
77            .collect();
78
79        let req = GetMultiVarRequest {
80            seqnum,
81            session_id: inner.conn.session_id,
82            items,
83        };
84        let mut da = BytesMut::new();
85        req.encode(&mut da);
86
87        let version = inner.conn.version.clone();
88        send_plus(&mut inner.transport, version, da.freeze()).await?;
89        let data = recv_plus_data(&mut inner.transport).await?;
90        let mut b = data;
91        let resp = GetMultiVarResponse::decode(&mut b, specs.len()).map_err(Error::Proto)?;
92        let results: Vec<Bytes> = resp
93            .items
94            .into_iter()
95            .zip(specs.iter())
96            .map(|(item, spec)| {
97                let len = spec.length as usize;
98                if item.value.len() >= len {
99                    item.value.slice(..len)
100                } else {
101                    item.value
102                }
103            })
104            .collect();
105        Ok(results)
106    }
107
108    /// Write multiple DB variables in a single S7CommPlus PDU.
109    ///
110    /// `specs` describes where to write, and `values` provides the data (one per spec).
111    pub async fn write_multi_vars(&self, specs: &[DbVarSpec], values: &[Bytes]) -> Result<(), Error> {
112        if specs.is_empty() {
113            return Ok(());
114        }
115        let mut inner = self.inner.lock().await;
116        let seqnum = inner.conn.seqnum;
117        inner.conn.seqnum = seqnum.wrapping_add(1);
118        let items: Vec<SetVarItem> = specs
119            .iter()
120            .zip(values.iter())
121            .map(|(s, v)| {
122                let (crc, lid) = db_lid(s.db, s.offset);
123                SetVarItem {
124                    crc,
125                    lid,
126                    value: v.clone(),
127                }
128            })
129            .collect();
130
131        let req = SetMultiVarRequest {
132            seqnum,
133            session_id: inner.conn.session_id,
134            items,
135        };
136        let mut da = BytesMut::new();
137        req.encode(&mut da);
138
139        let version = inner.conn.version.clone();
140        send_plus(&mut inner.transport, version, da.freeze()).await?;
141        let _data = recv_plus_data(&mut inner.transport).await?;
142        Ok(())
143    }
144
145    /// Send a KeepAlive frame to maintain the S7CommPlus session.
146    pub async fn send_keepalive(&self) -> Result<(), Error> {
147        let mut inner = self.inner.lock().await;
148        let frame = S7PlusFrame {
149            version: Version::KeepAlive,
150            data: Bytes::new(),
151        };
152        let mut fb = BytesMut::new();
153        frame.encode(&mut fb).map_err(Error::Proto)?;
154        let tpkt = TpktFrame {
155            payload: fb.freeze(),
156        };
157        let mut tb = BytesMut::new();
158        tpkt.encode(&mut tb).map_err(Error::Proto)?;
159        inner.transport.write_all(&tb).await?;
160        Ok(())
161    }
162
163    /// Send a DeleteObject request to close the session on the PLC.
164    pub async fn delete_object(&self) -> Result<(), Error> {
165        let mut inner = self.inner.lock().await;
166        let seqnum = inner.conn.seqnum;
167        inner.conn.seqnum = seqnum.wrapping_add(1);
168
169        // DeleteObject uses the same DataArea / FC_DELETE_OBJECT
170        let da = DataArea {
171            opcode: OPCODE_REQUEST,
172            function_code: FC_DELETE_OBJECT,
173            seqnum,
174            session_id: inner.conn.session_id,
175            transport_flags: 0,
176            payload: Bytes::new(),
177        };
178        let mut buf = BytesMut::new();
179        da.encode(&mut buf);
180
181        let version = inner.conn.version.clone();
182        send_plus(&mut inner.transport, version, buf.freeze()).await?;
183        let _resp = recv_plus_data(&mut inner.transport).await?;
184        Ok(())
185    }
186}
187
188// ---------------------------------------------------------------------------
189// TCP transport
190// ---------------------------------------------------------------------------
191
192impl S7PlusClient<TcpTransport> {
193    /// Connect to a PLC at `addr` using the S7CommPlus CreateObject handshake.
194    pub async fn connect(
195        addr: SocketAddr,
196        params: crate::types::ConnectParams,
197    ) -> Result<Self, Error> {
198        let transport = TcpTransport::connect(addr, params.connect_timeout).await?;
199        let (conn, transport) = plus_connect(transport).await?;
200        Ok(S7PlusClient {
201            inner: Mutex::new(PlusInner { transport, conn }),
202        })
203    }
204}
205
206// ---------------------------------------------------------------------------
207// TLS transport
208// ---------------------------------------------------------------------------
209
210impl S7PlusClient<TlsStream> {
211    /// Connect to a PLC using TLS transport and the S7CommPlus handshake.
212    ///
213    /// `server_name` is used for TLS SNI.  `extra_ca_der` can be `None` to
214    /// use the system root store.
215    pub async fn connect_tls(
216        addr: SocketAddr,
217        server_name: &str,
218        extra_ca_der: Option<&[u8]>,
219        _params: crate::types::ConnectParams,
220    ) -> Result<Self, Error> {
221        let transport = tls_connect(addr, server_name, extra_ca_der).await?;
222        let (conn, transport) = plus_connect(transport).await?;
223        Ok(S7PlusClient {
224            inner: Mutex::new(PlusInner { transport, conn }),
225        })
226    }
227}
228
229// ---------------------------------------------------------------------------
230// Helper functions
231// ---------------------------------------------------------------------------
232
233async fn send_plus<T>(transport: &mut T, version: Version, data: Bytes) -> Result<(), Error>
234where
235    T: AsyncWrite + Unpin,
236{
237    let frame = S7PlusFrame { version, data };
238    let mut fb = BytesMut::new();
239    frame.encode(&mut fb).map_err(Error::Proto)?;
240    let tpkt = TpktFrame {
241        payload: fb.freeze(),
242    };
243    let mut tb = BytesMut::new();
244    tpkt.encode(&mut tb).map_err(Error::Proto)?;
245    transport.write_all(&tb).await?;
246    Ok(())
247}
248
249async fn recv_plus_data<T>(transport: &mut T) -> Result<Bytes, Error>
250where
251    T: AsyncRead + Unpin,
252{
253    let mut hdr = [0u8; 4];
254    transport.read_exact(&mut hdr).await?;
255    let total = u16::from_be_bytes([hdr[2], hdr[3]]) as usize;
256    let mut payload = vec![0u8; total.saturating_sub(4)];
257    transport.read_exact(&mut payload).await?;
258    let mut b = Bytes::from(payload);
259    let frame = S7PlusFrame::decode(&mut b).map_err(Error::Proto)?;
260    Ok(frame.data)
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use bytes::BufMut;
267    use tokio::io::AsyncWriteExt;
268
269    fn build_get_var_response(session_id: u32, seqnum: u16, value: &[u8]) -> Vec<u8> {
270        use bytes::BytesMut;
271        use crate::proto::s7commplus::frame::{S7PlusFrame, Version};
272        use crate::proto::s7commplus::session::OPCODE_RESPONSE;
273        use crate::proto::tpkt::TpktFrame;
274
275        let mut da = BytesMut::new();
276        da.put_u8(OPCODE_RESPONSE);
277        da.put_u16(0x0000);
278        da.put_u16(0x054C); // FC_GET_MULTI_VAR
279        da.put_u16(0x0000);
280        da.put_u16(seqnum);
281        da.put_u32(session_id);
282        da.put_u8(0x00);
283        // payload: return_code(1) + len(2 BE) + value
284        da.put_u8(0x0A);
285        da.put_u16(value.len() as u16);
286        da.put_slice(value);
287
288        let frame = S7PlusFrame {
289            version: Version::V1,
290            data: da.freeze(),
291        };
292        let mut fb = BytesMut::new();
293        frame.encode(&mut fb).unwrap();
294        let tpkt = TpktFrame {
295            payload: fb.freeze(),
296        };
297        let mut tb = BytesMut::new();
298        tpkt.encode(&mut tb).unwrap();
299        tb.to_vec()
300    }
301
302    #[tokio::test]
303    async fn plus_db_read_returns_value() {
304        let session_id = 0x0000_0001_u32;
305        let value = [0x3F, 0x80, 0x00, 0x00]; // 1.0f32 BE
306        let response = build_get_var_response(session_id, 2, &value);
307
308        let (mut server, client_io) = tokio::io::duplex(4096);
309        tokio::spawn(async move {
310            let mut buf = vec![0u8; 4096];
311            let _ = tokio::io::AsyncReadExt::read(&mut server, &mut buf).await;
312            server.write_all(&response).await.unwrap();
313        });
314
315        let conn = PlusConnection {
316            session_id,
317            seqnum: 2,
318            version: crate::proto::s7commplus::frame::Version::V1,
319        };
320        let client = S7PlusClient {
321            inner: tokio::sync::Mutex::new(PlusInner {
322                transport: client_io,
323                conn,
324            }),
325        };
326        let data = client.db_read(1, 0, 4).await.unwrap();
327        assert_eq!(&data[..], &[0x3F, 0x80, 0x00, 0x00]);
328    }
329}