rseip/client/
mod.rs

1// rseip
2//
3// rseip - Ethernet/IP (CIP) in pure Rust.
4// Copyright: 2021, Joylei <leingliu@gmail.com>
5// License: MIT
6
7/// AB EIP
8pub mod ab_eip;
9/// generic EIP
10pub mod eip;
11
12use crate::{adapters::Service, ClientError, Result};
13pub use ab_eip::{AbEipClient, AbEipConnection, AbEipDriver, AbService, AbTemplateService};
14use bytes::Bytes;
15use core::{
16    fmt,
17    ops::{Deref, DerefMut},
18};
19pub use eip::*;
20use futures_util::future::BoxFuture;
21/// reexport
22pub use rseip_cip::connection::OpenOptions;
23use rseip_cip::{
24    connection::ForwardCloseRequest,
25    service::Heartbeat,
26    service::{request::UnconnectedSend, MessageService},
27    *,
28};
29use rseip_core::{
30    codec::{Decode, Encode},
31    Either, Error,
32};
33use std::{io, sync::atomic::AtomicU16};
34
35/// driver for specified protocol
36pub trait Driver: Send + Sync {
37    /// endpoint, eg: IP address for EIP
38    type Endpoint: fmt::Debug + Clone + Send + Sync;
39
40    /// driver specific service for CIP
41    type Service: Service + fmt::Debug + Send + Sync;
42
43    /// create service
44    fn build_service(addr: Self::Endpoint) -> BoxFuture<'static, Result<Self::Service>>;
45}
46
47/// explicit messaging client
48#[derive(Debug, Default)]
49pub struct Client<B: Driver> {
50    /// end point, driver specific
51    addr: B::Endpoint,
52    /// cip service, driver specific
53    service: Option<B::Service>,
54    /// connection path
55    connection_path: EPath,
56}
57
58impl<B: Driver> Client<B> {
59    /// create [`Client`]
60    #[inline]
61    pub fn new(addr: B::Endpoint) -> Self {
62        Self {
63            addr,
64            service: None,
65            connection_path: Default::default(),
66        }
67    }
68
69    /// set connection path
70    #[inline]
71    pub fn with_connection_path(mut self, path: impl Into<EPath>) -> Self {
72        self.connection_path = path.into();
73        self
74    }
75
76    /// current connection path
77    #[inline]
78    pub fn connection_path(&self) -> &EPath {
79        &self.connection_path
80    }
81
82    /// current remote endpoint, driver specific
83    #[inline]
84    pub fn remote_endpoint(&self) -> &B::Endpoint {
85        &self.addr
86    }
87
88    #[inline]
89    async fn ensure_service(&mut self) -> Result<()> {
90        if self.service.is_none() {
91            let service = B::build_service(self.addr.clone()).await?;
92            self.service = Some(service);
93        }
94        match self.service {
95            None => unreachable!(),
96            Some(ref mut service) => {
97                service.open().await?;
98            }
99        }
100        Ok(())
101    }
102}
103
104#[async_trait::async_trait]
105impl<B: Driver> Heartbeat for Client<B> {
106    type Error = ClientError;
107    /// send Heartbeat message to keep underline transport alive
108    #[inline]
109    async fn heartbeat(&mut self) -> Result<()> {
110        if let Some(ref mut service) = self.service {
111            service.heartbeat().await?;
112        }
113        Ok(())
114    }
115}
116
117/// message  request handler
118#[async_trait::async_trait]
119impl<B: Driver> MessageService for Client<B> {
120    type Error = ClientError;
121
122    /// unconnected send
123    #[inline]
124    async fn send<'de, P, D, R>(&mut self, mr: MessageRequest<P, D>) -> Result<R>
125    where
126        P: Encode + Send + Sync,
127        D: Encode + Send + Sync,
128        R: MessageReplyInterface + Decode<'de> + 'static,
129    {
130        // create service if not created
131        self.ensure_service().await?;
132        let service = self.service.as_mut().expect("expected service");
133        let req = UnconnectedSend::new(self.connection_path.clone(), mr);
134        let res = service.unconnected_send(req).await?;
135        Ok(res)
136    }
137
138    /// close underline transport
139    #[inline]
140    async fn close(&mut self) -> Result<()> {
141        if let Some(mut service) = self.service.take() {
142            let _ = service.close().await;
143        }
144        Ok(())
145    }
146
147    /// is underline transport closed?
148    #[inline]
149    fn closed(&self) -> bool {
150        self.service.is_none()
151    }
152}
153
154/// explicit messaging connection
155#[derive(Debug)]
156pub struct Connection<B: Driver> {
157    addr: B::Endpoint,
158    origin_options: OpenOptions,
159    connected_options: Option<OpenOptions>,
160    /// underline service
161    service: Option<B::Service>,
162    /// sequence number
163    seq_id: AtomicU16,
164}
165
166impl<B: Driver> Connection<B> {
167    /// Create connection
168    #[inline]
169    pub fn new(addr: B::Endpoint, options: OpenOptions) -> Self {
170        Self {
171            addr,
172            origin_options: options,
173            connected_options: None,
174            service: None,
175            seq_id: Default::default(),
176        }
177    }
178
179    /// current remote endpoint, driver specific
180    #[inline]
181    pub fn remote_endpoint(&self) -> &B::Endpoint {
182        &self.addr
183    }
184
185    /// CIP connection id
186    #[inline]
187    pub fn connection_id(&self) -> Option<u32> {
188        self.connected_options.as_ref().map(|v| v.o_t_connection_id)
189    }
190
191    /// is CIP connection built?
192    #[inline]
193    pub fn connected(&self) -> bool {
194        self.connection_id().is_some()
195    }
196
197    /// generate next sequence number
198    #[inline]
199    fn next_sequence_number(&mut self) -> u16 {
200        loop {
201            let v = self
202                .seq_id
203                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
204            // NOTE: sequence_number cannot be 0
205            if v > 0 {
206                return v;
207            }
208        }
209    }
210
211    /// close current connection and open a new connection
212    #[inline]
213    pub async fn reconnect(&mut self) -> Result<()> {
214        self.close_connection().await?;
215        self.open_connection().await?;
216        Ok(())
217    }
218
219    #[inline]
220    async fn ensure_service(&mut self) -> Result<()> {
221        if self.service.is_none() {
222            let service = B::build_service(self.addr.clone()).await?;
223            self.service = Some(service);
224        }
225        match self.service {
226            None => unreachable!(),
227            Some(ref mut service) => {
228                service.open().await?;
229            }
230        }
231        Ok(())
232    }
233
234    /// open connection if not already connected
235    #[inline]
236    async fn open_connection(&mut self) -> Result<u32> {
237        // create service if not created
238        self.ensure_service().await?;
239        let service = self.service.as_mut().expect("expected service");
240        if self.connected_options.is_none() {
241            let reply = service.forward_open(self.origin_options.clone()).await?;
242            match reply.into_value() {
243                Either::Left(reply) => {
244                    let opts = self
245                        .origin_options
246                        .clone()
247                        .o_t_connection_id(reply.o_t_connection_id)
248                        .connection_serial_number(reply.connection_serial_number)
249                        .o_t_rpi(reply.o_t_api)
250                        .t_o_rpi(reply.t_o_api);
251                    self.connected_options = Some(opts);
252                }
253                Either::Right(_) => return Err(Error::custom("forward open failed")),
254            }
255        }
256        Ok(self.connection_id().unwrap())
257    }
258
259    /// close connection
260    #[inline]
261    async fn close_connection(&mut self) -> Result<()> {
262        if let Some(conn) = self.connected_options.take() {
263            if let Some(service) = self.service.as_mut() {
264                let request = ForwardCloseRequest {
265                    priority_time_ticks: self.origin_options.priority_tick_time,
266                    timeout_ticks: self.origin_options.timeout_ticks,
267                    connection_serial_number: conn.connection_serial_number,
268                    originator_serial_number: conn.originator_serial_number,
269                    originator_vendor_id: conn.vendor_id,
270                    connection_path: conn.connection_path,
271                };
272                let _ = service.forward_close(request).await;
273            }
274        }
275        Ok(())
276    }
277}
278
279#[async_trait::async_trait]
280impl<B: Driver> Heartbeat for Connection<B> {
281    type Error = ClientError;
282
283    /// send Heartbeat message to keep underline transport alive
284    #[inline]
285    async fn heartbeat(&mut self) -> Result<()> {
286        if let Some(ref mut service) = self.service {
287            service.heartbeat().await?;
288            //TODO: is there a way to keep CIP connection alive?
289        }
290        Ok(())
291    }
292}
293
294#[async_trait::async_trait]
295impl<B: Driver> MessageService for Connection<B> {
296    type Error = ClientError;
297    /// connected send
298    #[inline]
299    async fn send<'de, P, D, R>(&mut self, mr: MessageRequest<P, D>) -> Result<R>
300    where
301        P: Encode + Send + Sync,
302        D: Encode + Send + Sync,
303        R: MessageReplyInterface + Decode<'de> + 'static,
304    {
305        // create connection if not connected
306        let cid = self.open_connection().await?;
307        let sid = self.next_sequence_number();
308        let service = self.service.as_mut().expect("expected service");
309        service.connected_send(cid, sid, mr).await
310    }
311
312    /// close current connection and underline transport
313    #[inline]
314    async fn close(&mut self) -> Result<()> {
315        let _ = self.close_connection().await;
316        if let Some(mut service) = self.service.take() {
317            let _ = service.close().await;
318        }
319        Ok(())
320    }
321
322    /// is connection closed?
323    #[inline]
324    fn closed(&self) -> bool {
325        self.connected_options.is_none()
326    }
327}
328
329/// client with CIP connection or without CIP connection
330#[derive(Debug)]
331pub struct MaybeConnected<B: Driver>(Either<Client<B>, Connection<B>>);
332
333impl<B: Driver> Deref for MaybeConnected<B> {
334    type Target = Either<Client<B>, Connection<B>>;
335    fn deref(&self) -> &Self::Target {
336        &self.0
337    }
338}
339
340impl<B: Driver> DerefMut for MaybeConnected<B> {
341    fn deref_mut(&mut self) -> &mut Self::Target {
342        &mut self.0
343    }
344}
345
346#[async_trait::async_trait]
347impl<B: Driver> Heartbeat for MaybeConnected<B> {
348    type Error = ClientError;
349    /// send Heartbeat message to keep underline connection/transport alive
350    #[inline]
351    async fn heartbeat(&mut self) -> Result<()> {
352        match self.0 {
353            Either::Left(ref mut c) => c.heartbeat().await,
354            Either::Right(ref mut c) => c.heartbeat().await,
355        }
356    }
357}
358
359#[async_trait::async_trait]
360impl<B: Driver> MessageService for MaybeConnected<B> {
361    type Error = ClientError;
362    /// send message request
363    #[inline]
364    async fn send<'de, P, D, R>(&mut self, mr: MessageRequest<P, D>) -> Result<R>
365    where
366        P: Encode + Send + Sync,
367        D: Encode + Send + Sync,
368        R: MessageReplyInterface + Decode<'de> + 'static,
369    {
370        match self.0 {
371            Either::Left(ref mut c) => c.send(mr).await,
372            Either::Right(ref mut c) => c.send(mr).await,
373        }
374    }
375
376    /// close underline connection/transport
377    #[inline]
378    async fn close(&mut self) -> Result<()> {
379        match self.0 {
380            Either::Left(ref mut c) => c.close().await,
381            Either::Right(ref mut c) => c.close().await,
382        }
383    }
384
385    /// underline connection/transport closed?
386    #[inline]
387    fn closed(&self) -> bool {
388        match self.0 {
389            Either::Left(ref c) => c.closed(),
390            Either::Right(ref c) => c.closed(),
391        }
392    }
393}