snap7_client/
connection.rs1use bytes::BytesMut;
2use crate::proto::{
3 cotp::CotpPdu,
4 s7::{
5 header::{PduType, S7Header},
6 negotiate::{NegotiateRequest, NegotiateResponse},
7 },
8 tpkt::TpktFrame,
9};
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12use crate::{
13 error::{Error, Result},
14 types::ConnectParams,
15};
16
17pub struct Connection {
18 pub pdu_size: u16,
19}
20
21pub async fn connect<T>(mut transport: T, params: &ConnectParams) -> Result<Connection>
22where
23 T: AsyncRead + AsyncWrite + Unpin,
24{
25 let cr = CotpPdu::ConnectRequest {
27 dst_ref: 0x0000,
28 src_ref: 0x0001,
29 rack: params.rack,
30 slot: params.slot,
31 };
32 send_cotp(&mut transport, &cr).await?;
33
34 let cc = recv_cotp(&mut transport).await?;
36 if !matches!(cc, CotpPdu::ConnectConfirm { .. }) {
37 return Err(Error::NegotiationFailed);
38 }
39
40 let neg_req = NegotiateRequest {
42 max_amq_calling: 1,
43 max_amq_called: 1,
44 pdu_length: params.pdu_size,
45 };
46 let mut s7_buf = BytesMut::new();
47 let header = S7Header {
48 pdu_type: PduType::Job,
49 reserved: 0,
50 pdu_ref: 1,
51 param_len: 8,
52 data_len: 0,
53 error_class: None,
54 error_code: None,
55 };
56 header.encode(&mut s7_buf);
57 neg_req.encode(&mut s7_buf);
58 send_cotp_data(&mut transport, s7_buf.freeze()).await?;
59
60 let payload = recv_cotp_data(&mut transport).await?;
62 let mut b = payload;
63 let resp_header = S7Header::decode(&mut b)?;
64 if resp_header.pdu_type != PduType::AckData {
65 return Err(Error::NegotiationFailed);
66 }
67 if let (Some(ec), Some(ecd)) = (resp_header.error_class, resp_header.error_code) {
68 if ec != 0 || ecd != 0 {
69 return Err(Error::PlcError {
70 code: ((ec as u32) << 8) | ecd as u32,
71 message: "negotiate error".into(),
72 });
73 }
74 }
75 let neg_resp = NegotiateResponse::decode(&mut b)?;
76 Ok(Connection {
77 pdu_size: neg_resp.pdu_length,
78 })
79}
80
81async fn send_cotp<T: AsyncWrite + Unpin>(transport: &mut T, pdu: &CotpPdu) -> Result<()> {
82 let mut cotp_buf = BytesMut::new();
83 pdu.encode(&mut cotp_buf);
84 let tpkt = TpktFrame {
85 payload: cotp_buf.freeze(),
86 };
87 let mut buf = BytesMut::new();
88 tpkt.encode(&mut buf)?;
89 transport.write_all(&buf).await?;
90 Ok(())
91}
92
93async fn send_cotp_data<T: AsyncWrite + Unpin>(
94 transport: &mut T,
95 payload: bytes::Bytes,
96) -> Result<()> {
97 let dt = CotpPdu::Data {
98 tpdu_nr: 0,
99 last: true,
100 payload,
101 };
102 send_cotp(transport, &dt).await
103}
104
105async fn recv_cotp<T: AsyncRead + Unpin>(transport: &mut T) -> Result<CotpPdu> {
106 let mut header = [0u8; 4];
107 transport.read_exact(&mut header).await?;
108 let total = u16::from_be_bytes([header[2], header[3]]) as usize;
109 let payload_len = total - 4;
110 let mut payload = vec![0u8; payload_len];
111 transport.read_exact(&mut payload).await?;
112 let mut b = bytes::Bytes::from(payload);
113 Ok(CotpPdu::decode(&mut b)?)
114}
115
116async fn recv_cotp_data<T: AsyncRead + Unpin>(transport: &mut T) -> Result<bytes::Bytes> {
117 let pdu = recv_cotp(transport).await?;
118 match pdu {
119 CotpPdu::Data { payload, .. } => Ok(payload),
120 _ => Err(Error::UnexpectedResponse),
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127 use bytes::BytesMut;
128 use crate::proto::{
129 cotp::CotpPdu,
130 s7::{
131 header::{PduType, S7Header},
132 negotiate::NegotiateResponse,
133 },
134 tpkt::TpktFrame,
135 };
136 use tokio::io::AsyncWriteExt;
137
138 async fn write_tpkt_cotp(writer: &mut (impl tokio::io::AsyncWrite + Unpin), cotp: &CotpPdu) {
139 let mut cotp_buf = BytesMut::new();
140 cotp.encode(&mut cotp_buf);
141 let tpkt = TpktFrame {
142 payload: cotp_buf.freeze(),
143 };
144 let mut buf = BytesMut::new();
145 tpkt.encode(&mut buf).unwrap();
146 writer.write_all(&buf).await.unwrap();
147 }
148
149 #[tokio::test]
150 async fn handshake_sends_cr_receives_cc() {
151 let (client_io, mut server_io) = tokio::io::duplex(4096);
152 let params = crate::types::ConnectParams::default();
153
154 tokio::spawn(async move {
155 use tokio::io::AsyncReadExt;
156 let mut buf = vec![0u8; 256];
157 let _ = server_io.read(&mut buf).await;
158 let cc = CotpPdu::ConnectConfirm {
159 dst_ref: 0x0001,
160 src_ref: 0x0001,
161 };
162 write_tpkt_cotp(&mut server_io, &cc).await;
163
164 let _ = server_io.read(&mut buf).await;
165 let neg = NegotiateResponse {
166 max_amq_calling: 1,
167 max_amq_called: 1,
168 pdu_length: 480,
169 };
170 let mut s7h = BytesMut::new();
171 let header = S7Header {
172 pdu_type: PduType::AckData,
173 reserved: 0,
174 pdu_ref: 1,
175 param_len: 8,
176 data_len: 0,
177 error_class: Some(0),
178 error_code: Some(0),
179 };
180 header.encode(&mut s7h);
181 neg.encode(&mut s7h);
182 let dt = CotpPdu::Data {
183 tpdu_nr: 0,
184 last: true,
185 payload: s7h.freeze(),
186 };
187 write_tpkt_cotp(&mut server_io, &dt).await;
188 });
189
190 let result = connect(client_io, ¶ms).await;
191 assert!(result.is_ok());
192 assert_eq!(result.unwrap().pdu_size, 480);
193 }
194
195 #[tokio::test]
196 async fn handshake_fails_when_cc_not_received() {
197 let (client_io, mut server_io) = tokio::io::duplex(4096);
198 let params = crate::types::ConnectParams::default();
199
200 tokio::spawn(async move {
201 use tokio::io::AsyncReadExt;
202 let mut buf = vec![0u8; 256];
203 let _ = server_io.read(&mut buf).await;
204 let er = CotpPdu::Error {
206 dst_ref: 0,
207 src_ref: 0,
208 reject_cause: 0,
209 };
210 write_tpkt_cotp(&mut server_io, &er).await;
211 });
212
213 let result = connect(client_io, ¶ms).await;
214 assert!(result.is_err());
215 }
216}