Skip to main content

snap7_client/
connection.rs

1use bytes::BytesMut;
2use crate::proto::{
3    cotp::CotpPdu,
4    s7::{
5        header::{PduType, S7Header},
6        negotiate::{NegotiateRequest, NegotiateResponse},
7    },
8    tpkt::TpktFrame,
9};
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12use crate::{
13    error::{Error, Result},
14    types::ConnectParams,
15};
16
17pub struct Connection {
18    pub pdu_size: u16,
19}
20
21pub async fn connect<T>(mut transport: T, params: &ConnectParams) -> Result<Connection>
22where
23    T: AsyncRead + AsyncWrite + Unpin,
24{
25    // Step 1: send COTP CR
26    let cr = CotpPdu::ConnectRequest {
27        dst_ref: 0x0000,
28        src_ref: 0x0001,
29        rack: params.rack,
30        slot: params.slot,
31    };
32    send_cotp(&mut transport, &cr).await?;
33
34    // Step 2: receive COTP CC
35    let cc = recv_cotp(&mut transport).await?;
36    if !matches!(cc, CotpPdu::ConnectConfirm { .. }) {
37        return Err(Error::NegotiationFailed);
38    }
39
40    // Step 3: send S7 negotiate request
41    let neg_req = NegotiateRequest {
42        max_amq_calling: 1,
43        max_amq_called: 1,
44        pdu_length: params.pdu_size,
45    };
46    let mut s7_buf = BytesMut::new();
47    let header = S7Header {
48        pdu_type: PduType::Job,
49        reserved: 0,
50        pdu_ref: 1,
51        param_len: 8,
52        data_len: 0,
53        error_class: None,
54        error_code: None,
55    };
56    header.encode(&mut s7_buf);
57    neg_req.encode(&mut s7_buf);
58    send_cotp_data(&mut transport, s7_buf.freeze()).await?;
59
60    // Step 4: receive S7 negotiate response
61    let payload = recv_cotp_data(&mut transport).await?;
62    let mut b = payload;
63    let resp_header = S7Header::decode(&mut b)?;
64    if resp_header.pdu_type != PduType::AckData {
65        return Err(Error::NegotiationFailed);
66    }
67    if let (Some(ec), Some(ecd)) = (resp_header.error_class, resp_header.error_code) {
68        if ec != 0 || ecd != 0 {
69            return Err(Error::PlcError {
70                code: ((ec as u32) << 8) | ecd as u32,
71                message: "negotiate error".into(),
72            });
73        }
74    }
75    let neg_resp = NegotiateResponse::decode(&mut b)?;
76    Ok(Connection {
77        pdu_size: neg_resp.pdu_length,
78    })
79}
80
81async fn send_cotp<T: AsyncWrite + Unpin>(transport: &mut T, pdu: &CotpPdu) -> Result<()> {
82    let mut cotp_buf = BytesMut::new();
83    pdu.encode(&mut cotp_buf);
84    let tpkt = TpktFrame {
85        payload: cotp_buf.freeze(),
86    };
87    let mut buf = BytesMut::new();
88    tpkt.encode(&mut buf)?;
89    transport.write_all(&buf).await?;
90    Ok(())
91}
92
93async fn send_cotp_data<T: AsyncWrite + Unpin>(
94    transport: &mut T,
95    payload: bytes::Bytes,
96) -> Result<()> {
97    let dt = CotpPdu::Data {
98        tpdu_nr: 0,
99        last: true,
100        payload,
101    };
102    send_cotp(transport, &dt).await
103}
104
105async fn recv_cotp<T: AsyncRead + Unpin>(transport: &mut T) -> Result<CotpPdu> {
106    let mut header = [0u8; 4];
107    transport.read_exact(&mut header).await?;
108    let total = u16::from_be_bytes([header[2], header[3]]) as usize;
109    let payload_len = total - 4;
110    let mut payload = vec![0u8; payload_len];
111    transport.read_exact(&mut payload).await?;
112    let mut b = bytes::Bytes::from(payload);
113    Ok(CotpPdu::decode(&mut b)?)
114}
115
116async fn recv_cotp_data<T: AsyncRead + Unpin>(transport: &mut T) -> Result<bytes::Bytes> {
117    let pdu = recv_cotp(transport).await?;
118    match pdu {
119        CotpPdu::Data { payload, .. } => Ok(payload),
120        _ => Err(Error::UnexpectedResponse),
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127    use bytes::BytesMut;
128    use crate::proto::{
129        cotp::CotpPdu,
130        s7::{
131            header::{PduType, S7Header},
132            negotiate::NegotiateResponse,
133        },
134        tpkt::TpktFrame,
135    };
136    use tokio::io::AsyncWriteExt;
137
138    async fn write_tpkt_cotp(writer: &mut (impl tokio::io::AsyncWrite + Unpin), cotp: &CotpPdu) {
139        let mut cotp_buf = BytesMut::new();
140        cotp.encode(&mut cotp_buf);
141        let tpkt = TpktFrame {
142            payload: cotp_buf.freeze(),
143        };
144        let mut buf = BytesMut::new();
145        tpkt.encode(&mut buf).unwrap();
146        writer.write_all(&buf).await.unwrap();
147    }
148
149    #[tokio::test]
150    async fn handshake_sends_cr_receives_cc() {
151        let (client_io, mut server_io) = tokio::io::duplex(4096);
152        let params = crate::types::ConnectParams::default();
153
154        tokio::spawn(async move {
155            use tokio::io::AsyncReadExt;
156            let mut buf = vec![0u8; 256];
157            let _ = server_io.read(&mut buf).await;
158            let cc = CotpPdu::ConnectConfirm {
159                dst_ref: 0x0001,
160                src_ref: 0x0001,
161            };
162            write_tpkt_cotp(&mut server_io, &cc).await;
163
164            let _ = server_io.read(&mut buf).await;
165            let neg = NegotiateResponse {
166                max_amq_calling: 1,
167                max_amq_called: 1,
168                pdu_length: 480,
169            };
170            let mut s7h = BytesMut::new();
171            let header = S7Header {
172                pdu_type: PduType::AckData,
173                reserved: 0,
174                pdu_ref: 1,
175                param_len: 8,
176                data_len: 0,
177                error_class: Some(0),
178                error_code: Some(0),
179            };
180            header.encode(&mut s7h);
181            neg.encode(&mut s7h);
182            let dt = CotpPdu::Data {
183                tpdu_nr: 0,
184                last: true,
185                payload: s7h.freeze(),
186            };
187            write_tpkt_cotp(&mut server_io, &dt).await;
188        });
189
190        let result = connect(client_io, &params).await;
191        assert!(result.is_ok());
192        assert_eq!(result.unwrap().pdu_size, 480);
193    }
194
195    #[tokio::test]
196    async fn handshake_fails_when_cc_not_received() {
197        let (client_io, mut server_io) = tokio::io::duplex(4096);
198        let params = crate::types::ConnectParams::default();
199
200        tokio::spawn(async move {
201            use tokio::io::AsyncReadExt;
202            let mut buf = vec![0u8; 256];
203            let _ = server_io.read(&mut buf).await;
204            // Send ER (Error) instead of CC
205            let er = CotpPdu::Error {
206                dst_ref: 0,
207                src_ref: 0,
208                reject_cause: 0,
209            };
210            write_tpkt_cotp(&mut server_io, &er).await;
211        });
212
213        let result = connect(client_io, &params).await;
214        assert!(result.is_err());
215    }
216}