s7_client/
client.rs

1use std::{
2    net::{IpAddr, SocketAddr},
3    time::Duration
4};
5
6use crate::{
7    build_copt_connect_request, build_s7_read,
8    build_s7_setup, build_s7_write, error::*
9};
10use bytes::BytesMut;
11use copt::{
12    CoptDecoder, CoptFrame, Parameter, PduType,
13    TpduSize
14};
15use log::debug;
16use s7_comm::{
17    AckData, DataItemVal, DataItemWriteResponse,
18    Frame, S7CommDecoder
19};
20use tokio::{
21    io::{AsyncReadExt, AsyncWriteExt},
22    net::TcpStream,
23    time::timeout
24};
25use tokio_util::codec::Decoder;
26use tpkt::{TpktDecoder, TpktFrame};
27
28mod param;
29mod request_param;
30
31pub use param::*;
32pub use request_param::*;
33
34pub struct S7Client {
35    options: Options,
36    connect: TcpStream
37}
38
39impl S7Client {
40    pub async fn connect(
41        options: Options
42    ) -> Result<Self> {
43        let connect =
44            tokio::net::TcpStream::connect(
45                SocketAddr::new(
46                    options.address,
47                    options.port
48                )
49            )
50            .await?;
51        let mut client =
52            Self { options, connect };
53        client.copt_connect().await?;
54        client.s7_setup().await?;
55        Ok(client)
56    }
57
58    async fn copt_connect(
59        &mut self
60    ) -> Result<()> {
61        let frame =
62            build_framed_copt_connect_request(
63                &self.options
64            )?;
65        self.write_frame(frame).await?;
66        let frame =
67            self.read_frame().await?.payload();
68        if let PduType::ConnectConfirm(comm) =
69            &frame.pdu_type
70        {
71            debug!("{:?}", comm);
72            for item in &comm.parameters {
73                if let Parameter::TpduSize(size) =
74                    item
75                {
76                    self.options.tpdu_size =
77                        size.clone();
78                }
79            }
80        } else {
81            return Err(Error::ConnectErr(
82                format!(
83                    "should recv connect \
84                     confirm, but not {:?}",
85                    frame
86                )
87            ));
88        }
89        Ok(())
90    }
91
92    async fn s7_setup(&mut self) -> Result<()> {
93        let frame =
94            build_framed_s7_setup(&self.options)?;
95        self.write_frame(frame).await?;
96        let frame =
97            self.read_frame().await?.payload();
98        if let PduType::DtData(comm) =
99            frame.pdu_type
100        {
101            if let Frame::AckData {
102                ack_data,
103                ..
104            } = comm.payload()
105            {
106                if let AckData::SetupCommunication(data) = ack_data {
107                        debug!("{:?}", data);
108                        self.options.pdu_len = data.pdu_length();
109                    }
110            }
111        } else {
112            return Err(Error::ConnectErr(
113                format!(
114                    "should recv connect \
115                     confirm, but not {:?}",
116                    frame
117                )
118            ));
119        }
120        Ok(())
121    }
122
123    pub async fn write_db_bytes(
124        &mut self,
125        db_number: u16,
126        byte_addr: u16,
127        data: &[u8]
128    ) -> Result<Vec<DataItemWriteResponse>> {
129        let frame = build_s7_write()
130            .pdu_ref(
131                self.options.tpdu_size.pdu_ref()
132            )
133            .write_db_bytes(
134                db_number, byte_addr, data
135            )
136            .build()?;
137
138        self.write(frame).await
139    }
140
141    pub async fn write_db_bit(
142        &mut self,
143        db_number: u16,
144        byte_addr: u16,
145        bit_addr: u8,
146        data: bool
147    ) -> Result<Vec<DataItemWriteResponse>> {
148        let frame = build_s7_write()
149            .pdu_ref(
150                self.options.tpdu_size.pdu_ref()
151            )
152            .write_db_bit(
153                db_number, byte_addr, bit_addr,
154                data
155            )
156            .build()?;
157        self.write(frame).await
158    }
159
160    async fn write(
161        &mut self,
162        frame: BytesMut
163    ) -> Result<Vec<DataItemWriteResponse>> {
164        self.write_frame(frame).await?;
165        let frame =
166            self.read_frame().await?.payload();
167        if let PduType::DtData(comm) =
168            frame.pdu_type
169        {
170            if let Frame::AckData {
171                ack_data,
172                ..
173            } = comm.payload()
174            {
175                if let AckData::WriteVar(data) =
176                    ack_data
177                {
178                    return Ok(data.data_item());
179                }
180            }
181        }
182        return Err(Error::Err(format!(
183            "should recv read var"
184        )));
185    }
186
187    pub async fn read(
188        &mut self,
189        areas: Vec<Area>
190    ) -> Result<Vec<DataItemVal>> {
191        let frame = build_framed_s7_read(
192            &self.options,
193            areas
194        )?;
195        self.write_frame(frame).await?;
196        let frame =
197            self.read_frame().await?.payload();
198        if let PduType::DtData(comm) =
199            frame.pdu_type
200        {
201            if let Frame::AckData {
202                ack_data,
203                ..
204            } = comm.payload()
205            {
206                if let AckData::ReadVar(data) =
207                    ack_data
208                {
209                    return Ok(data.data_item());
210                }
211            }
212        }
213        return Err(Error::Err(format!(
214            "should recv read var"
215        )));
216    }
217
218    async fn write_frame(
219        &mut self,
220        framed: BytesMut
221    ) -> Result<()> {
222        timeout(
223            self.options.write_timeout,
224            self.connect.write_all(&framed)
225        )
226        .await
227        .map_err(|_| Error::WriteTimeout)??;
228        Ok(())
229    }
230
231    async fn read_frame(
232        &mut self
233    ) -> Result<TpktFrame<CoptFrame<Frame>>> {
234        Ok(timeout(
235            self.options.read_timeout,
236            read_framed(&mut self.connect)
237        )
238        .await
239        .map_err(|_| Error::WriteTimeout)??)
240    }
241}
242
243#[derive(Debug, Clone)]
244pub struct Options {
245    pub read_timeout:  Duration,
246    pub write_timeout: Duration,
247    address:           IpAddr,
248    port:              u16,
249    pub conn_mode:     ConnectMode,
250    pub tpdu_size:     TpduSize,
251    //PDULength variable to store pdu length
252    // after connect
253    pdu_len:           u16
254}
255
256impl Options {
257    pub fn new(
258        address: IpAddr,
259        port: u16,
260        conn_mode: ConnectMode
261    ) -> Options {
262        Self {
263            read_timeout: Duration::from_millis(
264                500
265            ),
266            write_timeout: Duration::from_millis(
267                500
268            ),
269            port,
270            address,
271            conn_mode,
272            pdu_len: 480,
273            tpdu_size: TpduSize::L2048
274        }
275    }
276}
277
278async fn read_framed(
279    req: &mut TcpStream
280) -> Result<TpktFrame<CoptFrame<Frame>>> {
281    let mut buf = [0u8; 1000];
282    let mut bytes = BytesMut::new();
283    let mut decoder =
284        TpktDecoder(CoptDecoder(S7CommDecoder));
285    loop {
286        let size = req.read(&mut buf).await?;
287        bytes.extend_from_slice(
288            buf[0..size].as_ref()
289        );
290        if let Some(frame) =
291            decoder.decode(&mut bytes)?
292        {
293            return Ok(frame);
294        }
295    }
296}
297
298fn build_framed_s7_read(
299    options: &Options,
300    areas: Vec<Area>
301) -> Result<BytesMut> {
302    let mut builder = build_s7_read()
303        .pdu_ref(options.tpdu_size.pdu_ref());
304    for area in areas {
305        builder = builder.add_item(area.into());
306    }
307    Ok(builder.build()?)
308}
309
310fn build_framed_copt_connect_request(
311    options: &Options
312) -> Result<BytesMut> {
313    Ok(build_copt_connect_request()
314        .source_ref([0, 1])
315        .destination_ref([0, 0])
316        .class_and_others(0, false, false)
317        .pdu_size(TpduSize::L1024)
318        .src_tsap(options.conn_mode.local_tsap())
319        .dst_tsap(options.conn_mode.remote_tsap())
320        .build_to_request()?)
321}
322
323fn build_framed_s7_setup(
324    options: &Options
325) -> Result<BytesMut> {
326    Ok(build_s7_setup()
327        .max_amq_called(1)
328        .max_amq_calling(1)
329        .pdu_length(options.pdu_len)
330        .pdu_ref(options.tpdu_size.pdu_ref())
331        .build()?)
332}