snap7_client/
plus_client.rs1use bytes::{Bytes, BytesMut};
2use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
3use tokio::sync::Mutex;
4
5use crate::proto::s7commplus::frame::{S7PlusFrame, Version};
6use crate::proto::s7commplus::multivar::{GetVarRequest, GetVarResponse, SetVarRequest};
7use crate::proto::tpkt::TpktFrame;
8
9use crate::error::Error;
10use crate::plus_connection::{plus_connect, PlusConnection};
11use crate::transport::TcpTransport;
12
13pub(crate) struct PlusInner<T> {
15 pub transport: T,
16 pub conn: PlusConnection,
17}
18
19pub struct S7PlusClient<T: AsyncRead + AsyncWrite + Unpin + Send> {
21 pub(crate) inner: Mutex<PlusInner<T>>,
22}
23
24fn db_lid(db: u16, byte_offset: u32) -> (u32, u32) {
25 let crc = 0x48u32;
26 let lid = 0x8400_0000u32 | ((db as u32) << 12) | (byte_offset & 0xFFF);
27 (crc, lid)
28}
29
30impl<T: AsyncRead + AsyncWrite + Unpin + Send> S7PlusClient<T> {
31 pub async fn db_read(&self, db: u16, start: u32, length: u16) -> Result<Bytes, Error> {
33 let mut inner = self.inner.lock().await;
34 let seqnum = inner.conn.seqnum;
35 inner.conn.seqnum = seqnum.wrapping_add(1);
36 let (crc, lid) = db_lid(db, start);
37
38 let req = GetVarRequest {
39 seqnum,
40 session_id: inner.conn.session_id,
41 crc,
42 lid,
43 };
44 let mut da = BytesMut::new();
45 req.encode(&mut da);
46
47 let version = inner.conn.version.clone();
48 send_plus(&mut inner.transport, version, da.freeze()).await?;
49 let data = recv_plus_data(&mut inner.transport).await?;
50 let mut b = data;
51 let resp = GetVarResponse::decode(&mut b).map_err(Error::Proto)?;
52 let len = length as usize;
53 if resp.value.len() >= len {
54 Ok(resp.value.slice(..len))
55 } else {
56 Ok(resp.value)
57 }
58 }
59
60 pub async fn db_write(&self, db: u16, start: u32, data: &[u8]) -> Result<(), Error> {
62 let mut inner = self.inner.lock().await;
63 let seqnum = inner.conn.seqnum;
64 inner.conn.seqnum = seqnum.wrapping_add(1);
65 let (crc, lid) = db_lid(db, start);
66
67 let req = SetVarRequest {
68 seqnum,
69 session_id: inner.conn.session_id,
70 crc,
71 lid,
72 value: Bytes::copy_from_slice(data),
73 };
74 let mut da = BytesMut::new();
75 req.encode(&mut da);
76
77 let version = inner.conn.version.clone();
78 send_plus(&mut inner.transport, version, da.freeze()).await?;
79 let _resp = recv_plus_data(&mut inner.transport).await?;
80 Ok(())
81 }
82}
83
84impl S7PlusClient<TcpTransport> {
85 pub async fn connect(
87 addr: std::net::SocketAddr,
88 params: crate::types::ConnectParams,
89 ) -> Result<Self, Error> {
90 let transport = TcpTransport::connect(addr, params.connect_timeout).await?;
91 let (conn, transport) = plus_connect(transport).await?;
92 Ok(S7PlusClient {
93 inner: Mutex::new(PlusInner { transport, conn }),
94 })
95 }
96}
97
98async fn send_plus<T>(transport: &mut T, version: Version, data: Bytes) -> Result<(), Error>
99where
100 T: AsyncWrite + Unpin,
101{
102 let frame = S7PlusFrame { version, data };
103 let mut fb = BytesMut::new();
104 frame.encode(&mut fb).map_err(Error::Proto)?;
105 let tpkt = TpktFrame {
106 payload: fb.freeze(),
107 };
108 let mut tb = BytesMut::new();
109 tpkt.encode(&mut tb).map_err(Error::Proto)?;
110 transport.write_all(&tb).await?;
111 Ok(())
112}
113
114async fn recv_plus_data<T>(transport: &mut T) -> Result<Bytes, Error>
115where
116 T: AsyncRead + Unpin,
117{
118 let mut hdr = [0u8; 4];
119 transport.read_exact(&mut hdr).await?;
120 let total = u16::from_be_bytes([hdr[2], hdr[3]]) as usize;
121 let mut payload = vec![0u8; total.saturating_sub(4)];
122 transport.read_exact(&mut payload).await?;
123 let mut b = Bytes::from(payload);
124 let frame = S7PlusFrame::decode(&mut b).map_err(Error::Proto)?;
125 Ok(frame.data)
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use bytes::BufMut;
132 use tokio::io::AsyncWriteExt;
133
134 fn build_get_var_response(session_id: u32, seqnum: u16, value: &[u8]) -> Vec<u8> {
135 use bytes::BytesMut;
136 use crate::proto::s7commplus::frame::{S7PlusFrame, Version};
137 use crate::proto::s7commplus::session::OPCODE_RESPONSE;
138 use crate::proto::tpkt::TpktFrame;
139
140 let mut da = BytesMut::new();
141 da.put_u8(OPCODE_RESPONSE);
142 da.put_u16(0x0000);
143 da.put_u16(0x054C); da.put_u16(0x0000);
145 da.put_u16(seqnum);
146 da.put_u32(session_id);
147 da.put_u8(0x00);
148 da.put_u8(0x0A);
150 da.put_u16(value.len() as u16);
151 da.put_slice(value);
152
153 let frame = S7PlusFrame {
154 version: Version::V1,
155 data: da.freeze(),
156 };
157 let mut fb = BytesMut::new();
158 frame.encode(&mut fb).unwrap();
159 let tpkt = TpktFrame {
160 payload: fb.freeze(),
161 };
162 let mut tb = BytesMut::new();
163 tpkt.encode(&mut tb).unwrap();
164 tb.to_vec()
165 }
166
167 #[tokio::test]
168 async fn plus_db_read_returns_value() {
169 let session_id = 0x0000_0001_u32;
170 let value = [0x3F, 0x80, 0x00, 0x00]; let response = build_get_var_response(session_id, 2, &value);
172
173 let (mut server, client_io) = tokio::io::duplex(4096);
174 tokio::spawn(async move {
175 let mut buf = vec![0u8; 4096];
176 let _ = tokio::io::AsyncReadExt::read(&mut server, &mut buf).await;
177 server.write_all(&response).await.unwrap();
178 });
179
180 let conn = PlusConnection {
181 session_id,
182 seqnum: 2,
183 version: crate::proto::s7commplus::frame::Version::V1,
184 };
185 let client = S7PlusClient {
186 inner: tokio::sync::Mutex::new(PlusInner {
187 transport: client_io,
188 conn,
189 }),
190 };
191 let data = client.db_read(1, 0, 4).await.unwrap();
192 assert_eq!(&data[..], &[0x3F, 0x80, 0x00, 0x00]);
193 }
194}