1use bytes::{Bytes, BytesMut};
2use std::net::SocketAddr;
3use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4use tokio::sync::Mutex;
5
6use crate::proto::s7commplus::frame::{S7PlusFrame, Version};
7use crate::proto::s7commplus::multivar::{
8 GetMultiVarRequest, GetMultiVarResponse, SetMultiVarRequest, SetVarItem, VarSpec,
9};
10use crate::proto::s7commplus::session::{FC_DELETE_OBJECT, OPCODE_REQUEST};
11use crate::proto::s7commplus::data::DataArea;
12use crate::proto::tpkt::TpktFrame;
13
14use crate::error::Error;
15use crate::plus_connection::{plus_connect, PlusConnection};
16use crate::tls::{tls_connect, TlsStream};
17use crate::transport::TcpTransport;
18
19pub(crate) struct PlusInner<T> {
21 pub transport: T,
22 pub conn: PlusConnection,
23}
24
25pub struct S7PlusClient<T: AsyncRead + AsyncWrite + Unpin + Send> {
27 pub(crate) inner: Mutex<PlusInner<T>>,
28}
29
30fn db_lid(db: u16, byte_offset: u32) -> (u32, u32) {
31 let crc = 0x48u32;
32 let lid = 0x8400_0000u32 | ((db as u32) << 12) | (byte_offset & 0xFFF);
33 (crc, lid)
34}
35
36#[derive(Debug, Clone)]
38pub struct DbVarSpec {
39 pub db: u16,
40 pub offset: u32,
41 pub length: u16,
42}
43
44impl<T: AsyncRead + AsyncWrite + Unpin + Send> S7PlusClient<T> {
49 pub async fn db_read(&self, db: u16, start: u32, length: u16) -> Result<Bytes, Error> {
51 let r = self.read_multi_vars(&[DbVarSpec { db, offset: start, length }]).await?;
52 Ok(r.into_iter().next().unwrap_or_default())
53 }
54
55 pub async fn db_write(&self, db: u16, start: u32, data: &[u8]) -> Result<(), Error> {
57 self.write_multi_vars(&[DbVarSpec { db, offset: start, length: data.len() as u16 }], &[Bytes::copy_from_slice(data)]).await
58 }
59
60 pub async fn read_multi_vars(&self, specs: &[DbVarSpec]) -> Result<Vec<Bytes>, Error> {
65 if specs.is_empty() {
66 return Ok(Vec::new());
67 }
68 let mut inner = self.inner.lock().await;
69 let seqnum = inner.conn.seqnum;
70 inner.conn.seqnum = seqnum.wrapping_add(1);
71 let items: Vec<VarSpec> = specs
72 .iter()
73 .map(|s| {
74 let (crc, lid) = db_lid(s.db, s.offset);
75 VarSpec { crc, lid }
76 })
77 .collect();
78
79 let req = GetMultiVarRequest {
80 seqnum,
81 session_id: inner.conn.session_id,
82 items,
83 };
84 let mut da = BytesMut::new();
85 req.encode(&mut da);
86
87 let version = inner.conn.version.clone();
88 send_plus(&mut inner.transport, version, da.freeze()).await?;
89 let data = recv_plus_data(&mut inner.transport).await?;
90 let mut b = data;
91 let resp = GetMultiVarResponse::decode(&mut b, specs.len()).map_err(Error::Proto)?;
92 let results: Vec<Bytes> = resp
93 .items
94 .into_iter()
95 .zip(specs.iter())
96 .map(|(item, spec)| {
97 let len = spec.length as usize;
98 if item.value.len() >= len {
99 item.value.slice(..len)
100 } else {
101 item.value
102 }
103 })
104 .collect();
105 Ok(results)
106 }
107
108 pub async fn write_multi_vars(&self, specs: &[DbVarSpec], values: &[Bytes]) -> Result<(), Error> {
112 if specs.is_empty() {
113 return Ok(());
114 }
115 let mut inner = self.inner.lock().await;
116 let seqnum = inner.conn.seqnum;
117 inner.conn.seqnum = seqnum.wrapping_add(1);
118 let items: Vec<SetVarItem> = specs
119 .iter()
120 .zip(values.iter())
121 .map(|(s, v)| {
122 let (crc, lid) = db_lid(s.db, s.offset);
123 SetVarItem {
124 crc,
125 lid,
126 value: v.clone(),
127 }
128 })
129 .collect();
130
131 let req = SetMultiVarRequest {
132 seqnum,
133 session_id: inner.conn.session_id,
134 items,
135 };
136 let mut da = BytesMut::new();
137 req.encode(&mut da);
138
139 let version = inner.conn.version.clone();
140 send_plus(&mut inner.transport, version, da.freeze()).await?;
141 let _data = recv_plus_data(&mut inner.transport).await?;
142 Ok(())
143 }
144
145 pub async fn send_keepalive(&self) -> Result<(), Error> {
147 let mut inner = self.inner.lock().await;
148 let frame = S7PlusFrame {
149 version: Version::KeepAlive,
150 data: Bytes::new(),
151 };
152 let mut fb = BytesMut::new();
153 frame.encode(&mut fb).map_err(Error::Proto)?;
154 let tpkt = TpktFrame {
155 payload: fb.freeze(),
156 };
157 let mut tb = BytesMut::new();
158 tpkt.encode(&mut tb).map_err(Error::Proto)?;
159 inner.transport.write_all(&tb).await?;
160 Ok(())
161 }
162
163 pub async fn delete_object(&self) -> Result<(), Error> {
165 let mut inner = self.inner.lock().await;
166 let seqnum = inner.conn.seqnum;
167 inner.conn.seqnum = seqnum.wrapping_add(1);
168
169 let da = DataArea {
171 opcode: OPCODE_REQUEST,
172 function_code: FC_DELETE_OBJECT,
173 seqnum,
174 session_id: inner.conn.session_id,
175 transport_flags: 0,
176 payload: Bytes::new(),
177 };
178 let mut buf = BytesMut::new();
179 da.encode(&mut buf);
180
181 let version = inner.conn.version.clone();
182 send_plus(&mut inner.transport, version, buf.freeze()).await?;
183 let _resp = recv_plus_data(&mut inner.transport).await?;
184 Ok(())
185 }
186}
187
188impl S7PlusClient<TcpTransport> {
193 pub async fn connect(
195 addr: SocketAddr,
196 params: crate::types::ConnectParams,
197 ) -> Result<Self, Error> {
198 let transport = TcpTransport::connect(addr, params.connect_timeout).await?;
199 let (conn, transport) = plus_connect(transport).await?;
200 Ok(S7PlusClient {
201 inner: Mutex::new(PlusInner { transport, conn }),
202 })
203 }
204}
205
206impl S7PlusClient<TlsStream> {
211 pub async fn connect_tls(
216 addr: SocketAddr,
217 server_name: &str,
218 extra_ca_der: Option<&[u8]>,
219 _params: crate::types::ConnectParams,
220 ) -> Result<Self, Error> {
221 let transport = tls_connect(addr, server_name, extra_ca_der).await?;
222 let (conn, transport) = plus_connect(transport).await?;
223 Ok(S7PlusClient {
224 inner: Mutex::new(PlusInner { transport, conn }),
225 })
226 }
227}
228
229async fn send_plus<T>(transport: &mut T, version: Version, data: Bytes) -> Result<(), Error>
234where
235 T: AsyncWrite + Unpin,
236{
237 let frame = S7PlusFrame { version, data };
238 let mut fb = BytesMut::new();
239 frame.encode(&mut fb).map_err(Error::Proto)?;
240 let tpkt = TpktFrame {
241 payload: fb.freeze(),
242 };
243 let mut tb = BytesMut::new();
244 tpkt.encode(&mut tb).map_err(Error::Proto)?;
245 transport.write_all(&tb).await?;
246 Ok(())
247}
248
249async fn recv_plus_data<T>(transport: &mut T) -> Result<Bytes, Error>
250where
251 T: AsyncRead + Unpin,
252{
253 let mut hdr = [0u8; 4];
254 transport.read_exact(&mut hdr).await?;
255 let total = u16::from_be_bytes([hdr[2], hdr[3]]) as usize;
256 let mut payload = vec![0u8; total.saturating_sub(4)];
257 transport.read_exact(&mut payload).await?;
258 let mut b = Bytes::from(payload);
259 let frame = S7PlusFrame::decode(&mut b).map_err(Error::Proto)?;
260 Ok(frame.data)
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use bytes::BufMut;
267 use tokio::io::AsyncWriteExt;
268
269 fn build_get_var_response(session_id: u32, seqnum: u16, value: &[u8]) -> Vec<u8> {
270 use bytes::BytesMut;
271 use crate::proto::s7commplus::frame::{S7PlusFrame, Version};
272 use crate::proto::s7commplus::session::OPCODE_RESPONSE;
273 use crate::proto::tpkt::TpktFrame;
274
275 let mut da = BytesMut::new();
276 da.put_u8(OPCODE_RESPONSE);
277 da.put_u16(0x0000);
278 da.put_u16(0x054C); da.put_u16(0x0000);
280 da.put_u16(seqnum);
281 da.put_u32(session_id);
282 da.put_u8(0x00);
283 da.put_u8(0x0A);
285 da.put_u16(value.len() as u16);
286 da.put_slice(value);
287
288 let frame = S7PlusFrame {
289 version: Version::V1,
290 data: da.freeze(),
291 };
292 let mut fb = BytesMut::new();
293 frame.encode(&mut fb).unwrap();
294 let tpkt = TpktFrame {
295 payload: fb.freeze(),
296 };
297 let mut tb = BytesMut::new();
298 tpkt.encode(&mut tb).unwrap();
299 tb.to_vec()
300 }
301
302 #[tokio::test]
303 async fn plus_db_read_returns_value() {
304 let session_id = 0x0000_0001_u32;
305 let value = [0x3F, 0x80, 0x00, 0x00]; let response = build_get_var_response(session_id, 2, &value);
307
308 let (mut server, client_io) = tokio::io::duplex(4096);
309 tokio::spawn(async move {
310 let mut buf = vec![0u8; 4096];
311 let _ = tokio::io::AsyncReadExt::read(&mut server, &mut buf).await;
312 server.write_all(&response).await.unwrap();
313 });
314
315 let conn = PlusConnection {
316 session_id,
317 seqnum: 2,
318 version: crate::proto::s7commplus::frame::Version::V1,
319 };
320 let client = S7PlusClient {
321 inner: tokio::sync::Mutex::new(PlusInner {
322 transport: client_io,
323 conn,
324 }),
325 };
326 let data = client.db_read(1, 0, 4).await.unwrap();
327 assert_eq!(&data[..], &[0x3F, 0x80, 0x00, 0x00]);
328 }
329}