1use super::{
8 command::{self, Command},
9 framed::Framed,
10 EncapsulationPacket,
11};
12use crate::{codec::ClientCodec, *};
13use byteorder::{ByteOrder, LittleEndian};
14use bytes::{BufMut, Bytes, BytesMut};
15use core::fmt;
16use futures_util::{SinkExt, StreamExt};
17use rseip_core::{
18 cip::CommonPacketIter,
19 codec::{Encode, LittleEndianDecoder},
20};
21use tokio::io::{AsyncRead, AsyncWrite};
22
23pub type CommonPacket<'a, E> = CommonPacketIter<'a, LittleEndianDecoder<E>>;
24
25pub struct EipContext<T, E: Error> {
27 framed: Framed<T, ClientCodec<E>>,
28 session_handle: u32,
29 #[allow(unused)]
30 sender_context: Bytes,
31}
32
33impl<T, E: Error> fmt::Debug for EipContext<T, E> {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 f.debug_struct("EipContext")
36 .field("session_handle", &self.session_handle)
37 .field("sender_context", &self.sender_context)
38 .field("framed", &"<Framed>")
39 .finish()
40 }
41}
42
43impl<T, E: Error> EipContext<T, E> {
44 #[allow(unused)]
46 #[inline]
47 pub fn with_sender_context(&mut self, sender_context: [u8; 8]) -> &mut Self {
48 let mut buf = BytesMut::new();
49 buf.put_slice(&sender_context);
50 self.sender_context = buf.freeze();
51 self
52 }
53
54 #[inline]
56 pub fn session_handle(&self) -> Option<u32> {
57 if self.session_handle > 0 {
58 Some(self.session_handle)
59 } else {
60 None
61 }
62 }
63
64 #[inline]
66 pub fn has_session(&self) -> bool {
67 self.session_handle > 0
68 }
69}
70
71impl<T, E> EipContext<T, E>
72where
73 T: AsyncRead + AsyncWrite + Unpin,
74 E: Error + 'static,
75{
76 #[inline]
78 pub fn new(transport: T) -> Self {
79 let framed = Framed::new(transport, ClientCodec::new());
80
81 Self {
82 framed,
83 session_handle: 0,
84 sender_context: Bytes::from_static(&[0, 0, 0, 0, 0, 0, 0, 0]),
85 }
86 }
87
88 #[inline]
90 async fn send_and_reply<C>(&mut self, cmd: C) -> Result<EncapsulationPacket<Bytes>, E>
91 where
92 C: Command,
93 {
94 let code = C::command_code();
95 log::trace!("send command: {:#0x?}", code);
96 self.framed.send(cmd).await?;
97 match self.framed.next().await {
98 Some(item) => {
99 let pkt: EncapsulationPacket<Bytes> = item?;
100 pkt.hdr.ensure_command::<E>(code)?;
101 Ok(pkt)
102 }
103 None => Err(E::custom("transport closed")),
104 }
105 }
106
107 #[inline]
109 pub async fn nop<D: Encode>(&mut self, data: D) -> Result<(), E> {
110 log::trace!("send command: NOP");
111 self.framed.send(command::Nop { data }).await?;
112 Ok(())
113 }
114
115 #[allow(unused)]
117 #[inline]
118 pub async fn list_identity<'de>(&mut self) -> Result<CommonPacket<'static, E>, E> {
119 let pkt = self.send_and_reply(command::ListIdentity).await?;
120 let res = CommonPacketIter::new(LittleEndianDecoder::<E>::new(pkt.data))?;
121 Ok(res)
122 }
123
124 #[allow(unused)]
126 #[inline]
127 pub async fn list_service<'de>(&mut self) -> Result<CommonPacket<'static, E>, E> {
128 let pkt = self.send_and_reply(command::ListServices).await?;
129 CommonPacket::new(LittleEndianDecoder::<E>::new(pkt.data))
130 }
131
132 #[allow(unused)]
134 #[inline]
135 pub async fn list_interface<'de>(&mut self) -> Result<CommonPacket<'static, E>, E> {
136 let pkt = self.send_and_reply(command::ListInterfaces).await?;
137 CommonPacket::new(LittleEndianDecoder::<E>::new(pkt.data))
138 }
139
140 #[inline]
142 pub async fn register_session(&mut self) -> Result<u32, E> {
143 let pkt = self.send_and_reply(command::RegisterSession).await?;
144 let session_handle = pkt.hdr.session_handle;
145 let reply_data = pkt.data;
146 if reply_data.len() != 4 {
147 return Err(E::invalid_length(reply_data.len(), 4));
148 }
149 #[cfg(debug_assertions)]
150 {
151 let protocol_version = LittleEndian::read_u16(&reply_data[0..2]);
152 debug_assert_eq!(protocol_version, 1);
153 let session_options = LittleEndian::read_u16(&reply_data[2..4]);
154 debug_assert_eq!(session_options, 0);
155 }
156 if session_handle == 0 {
157 return Err(E::invalid_value("session handle 0", ">0"));
158 }
159 self.session_handle = session_handle;
160 Ok(session_handle)
161 }
162
163 #[inline]
165 pub async fn unregister_session(&mut self) -> Result<(), E> {
166 if self.session_handle == 0 {
167 return Ok(());
168 }
169 log::trace!("send command: UnRegisterSession");
170 self.framed
171 .send(command::UnRegisterSession {
172 session_handle: self.session_handle,
173 })
174 .await?;
175 Ok(())
176 }
177
178 #[inline]
180 pub async fn send_rrdata<'de, D>(&mut self, data: D) -> Result<CommonPacket<'static, E>, E>
181 where
182 D: Encode,
183 {
184 let pkt = self
185 .send_and_reply(command::SendRRData {
186 session_handle: self.session_handle,
187 timeout: 0,
188 data,
189 })
190 .await?;
191 let interface_handle = LittleEndian::read_u32(&pkt.data[0..4]); debug_assert_eq!(interface_handle, 0);
193 CommonPacket::new(LittleEndianDecoder::<E>::new(pkt.data.slice(6..)))
195 }
196
197 #[inline]
199 pub async fn send_unit_data<'de, D>(
200 &mut self,
201 connection_id: u32,
202 sequence_number: u16,
203 data: D,
204 ) -> Result<CommonPacket<'static, E>, E>
205 where
206 D: Encode,
207 {
208 let pkt = self
209 .send_and_reply(command::SendUnitData {
210 session_handle: self.session_handle,
211 sequence_number,
212 connection_id,
213 data,
214 })
215 .await?;
216 let interface_handle = LittleEndian::read_u32(&pkt.data[0..4]); debug_assert_eq!(interface_handle, 0);
218 CommonPacketIter::new(LittleEndianDecoder::<E>::new(pkt.data.slice(6..)))
220 }
221}