Skip to main content

snap7_client/
plus_client.rs

1use bytes::{Bytes, BytesMut};
2use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
3use tokio::sync::Mutex;
4
5use crate::proto::s7commplus::frame::{S7PlusFrame, Version};
6use crate::proto::s7commplus::multivar::{GetVarRequest, GetVarResponse, SetVarRequest};
7use crate::proto::tpkt::TpktFrame;
8
9use crate::error::Error;
10use crate::plus_connection::{plus_connect, PlusConnection};
11use crate::transport::TcpTransport;
12
13/// Inner mutable state for an `S7PlusClient`.
14pub(crate) struct PlusInner<T> {
15    pub transport: T,
16    pub conn: PlusConnection,
17}
18
19/// A client for S7CommPlus (S7-1200/1500 "integrity mode") communication.
20pub struct S7PlusClient<T: AsyncRead + AsyncWrite + Unpin + Send> {
21    pub(crate) inner: Mutex<PlusInner<T>>,
22}
23
24fn db_lid(db: u16, byte_offset: u32) -> (u32, u32) {
25    let crc = 0x48u32;
26    let lid = 0x8400_0000u32 | ((db as u32) << 12) | (byte_offset & 0xFFF);
27    (crc, lid)
28}
29
30impl<T: AsyncRead + AsyncWrite + Unpin + Send> S7PlusClient<T> {
31    /// Read `length` bytes from DB `db` at byte offset `start`.
32    pub async fn db_read(&self, db: u16, start: u32, length: u16) -> Result<Bytes, Error> {
33        let mut inner = self.inner.lock().await;
34        let seqnum = inner.conn.seqnum;
35        inner.conn.seqnum = seqnum.wrapping_add(1);
36        let (crc, lid) = db_lid(db, start);
37
38        let req = GetVarRequest {
39            seqnum,
40            session_id: inner.conn.session_id,
41            crc,
42            lid,
43        };
44        let mut da = BytesMut::new();
45        req.encode(&mut da);
46
47        let version = inner.conn.version.clone();
48        send_plus(&mut inner.transport, version, da.freeze()).await?;
49        let data = recv_plus_data(&mut inner.transport).await?;
50        let mut b = data;
51        let resp = GetVarResponse::decode(&mut b).map_err(Error::Proto)?;
52        let len = length as usize;
53        if resp.value.len() >= len {
54            Ok(resp.value.slice(..len))
55        } else {
56            Ok(resp.value)
57        }
58    }
59
60    /// Write `data` to DB `db` at byte offset `start`.
61    pub async fn db_write(&self, db: u16, start: u32, data: &[u8]) -> Result<(), Error> {
62        let mut inner = self.inner.lock().await;
63        let seqnum = inner.conn.seqnum;
64        inner.conn.seqnum = seqnum.wrapping_add(1);
65        let (crc, lid) = db_lid(db, start);
66
67        let req = SetVarRequest {
68            seqnum,
69            session_id: inner.conn.session_id,
70            crc,
71            lid,
72            value: Bytes::copy_from_slice(data),
73        };
74        let mut da = BytesMut::new();
75        req.encode(&mut da);
76
77        let version = inner.conn.version.clone();
78        send_plus(&mut inner.transport, version, da.freeze()).await?;
79        let _resp = recv_plus_data(&mut inner.transport).await?;
80        Ok(())
81    }
82}
83
84impl S7PlusClient<TcpTransport> {
85    /// Connect to a PLC at `addr` using the S7CommPlus CreateObject handshake.
86    pub async fn connect(
87        addr: std::net::SocketAddr,
88        params: crate::types::ConnectParams,
89    ) -> Result<Self, Error> {
90        let transport = TcpTransport::connect(addr, params.connect_timeout).await?;
91        let (conn, transport) = plus_connect(transport).await?;
92        Ok(S7PlusClient {
93            inner: Mutex::new(PlusInner { transport, conn }),
94        })
95    }
96}
97
98async fn send_plus<T>(transport: &mut T, version: Version, data: Bytes) -> Result<(), Error>
99where
100    T: AsyncWrite + Unpin,
101{
102    let frame = S7PlusFrame { version, data };
103    let mut fb = BytesMut::new();
104    frame.encode(&mut fb).map_err(Error::Proto)?;
105    let tpkt = TpktFrame {
106        payload: fb.freeze(),
107    };
108    let mut tb = BytesMut::new();
109    tpkt.encode(&mut tb).map_err(Error::Proto)?;
110    transport.write_all(&tb).await?;
111    Ok(())
112}
113
114async fn recv_plus_data<T>(transport: &mut T) -> Result<Bytes, Error>
115where
116    T: AsyncRead + Unpin,
117{
118    let mut hdr = [0u8; 4];
119    transport.read_exact(&mut hdr).await?;
120    let total = u16::from_be_bytes([hdr[2], hdr[3]]) as usize;
121    let mut payload = vec![0u8; total.saturating_sub(4)];
122    transport.read_exact(&mut payload).await?;
123    let mut b = Bytes::from(payload);
124    let frame = S7PlusFrame::decode(&mut b).map_err(Error::Proto)?;
125    Ok(frame.data)
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use bytes::BufMut;
132    use tokio::io::AsyncWriteExt;
133
134    fn build_get_var_response(session_id: u32, seqnum: u16, value: &[u8]) -> Vec<u8> {
135        use bytes::BytesMut;
136        use crate::proto::s7commplus::frame::{S7PlusFrame, Version};
137        use crate::proto::s7commplus::session::OPCODE_RESPONSE;
138        use crate::proto::tpkt::TpktFrame;
139
140        let mut da = BytesMut::new();
141        da.put_u8(OPCODE_RESPONSE);
142        da.put_u16(0x0000);
143        da.put_u16(0x054C); // FC_GET_MULTI_VAR
144        da.put_u16(0x0000);
145        da.put_u16(seqnum);
146        da.put_u32(session_id);
147        da.put_u8(0x00);
148        // payload: return_code(1) + len(2 BE) + value
149        da.put_u8(0x0A);
150        da.put_u16(value.len() as u16);
151        da.put_slice(value);
152
153        let frame = S7PlusFrame {
154            version: Version::V1,
155            data: da.freeze(),
156        };
157        let mut fb = BytesMut::new();
158        frame.encode(&mut fb).unwrap();
159        let tpkt = TpktFrame {
160            payload: fb.freeze(),
161        };
162        let mut tb = BytesMut::new();
163        tpkt.encode(&mut tb).unwrap();
164        tb.to_vec()
165    }
166
167    #[tokio::test]
168    async fn plus_db_read_returns_value() {
169        let session_id = 0x0000_0001_u32;
170        let value = [0x3F, 0x80, 0x00, 0x00]; // 1.0f32 BE
171        let response = build_get_var_response(session_id, 2, &value);
172
173        let (mut server, client_io) = tokio::io::duplex(4096);
174        tokio::spawn(async move {
175            let mut buf = vec![0u8; 4096];
176            let _ = tokio::io::AsyncReadExt::read(&mut server, &mut buf).await;
177            server.write_all(&response).await.unwrap();
178        });
179
180        let conn = PlusConnection {
181            session_id,
182            seqnum: 2,
183            version: crate::proto::s7commplus::frame::Version::V1,
184        };
185        let client = S7PlusClient {
186            inner: tokio::sync::Mutex::new(PlusInner {
187                transport: client_io,
188                conn,
189            }),
190        };
191        let data = client.db_read(1, 0, 4).await.unwrap();
192        assert_eq!(&data[..], &[0x3F, 0x80, 0x00, 0x00]);
193    }
194}