1pub mod ab_eip;
9pub mod eip;
11
12use crate::{adapters::Service, ClientError, Result};
13pub use ab_eip::{AbEipClient, AbEipConnection, AbEipDriver, AbService, AbTemplateService};
14use bytes::Bytes;
15use core::{
16 fmt,
17 ops::{Deref, DerefMut},
18};
19pub use eip::*;
20use futures_util::future::BoxFuture;
21pub use rseip_cip::connection::OpenOptions;
23use rseip_cip::{
24 connection::ForwardCloseRequest,
25 service::Heartbeat,
26 service::{request::UnconnectedSend, MessageService},
27 *,
28};
29use rseip_core::{
30 codec::{Decode, Encode},
31 Either, Error,
32};
33use std::{io, sync::atomic::AtomicU16};
34
35pub trait Driver: Send + Sync {
37 type Endpoint: fmt::Debug + Clone + Send + Sync;
39
40 type Service: Service + fmt::Debug + Send + Sync;
42
43 fn build_service(addr: Self::Endpoint) -> BoxFuture<'static, Result<Self::Service>>;
45}
46
47#[derive(Debug, Default)]
49pub struct Client<B: Driver> {
50 addr: B::Endpoint,
52 service: Option<B::Service>,
54 connection_path: EPath,
56}
57
58impl<B: Driver> Client<B> {
59 #[inline]
61 pub fn new(addr: B::Endpoint) -> Self {
62 Self {
63 addr,
64 service: None,
65 connection_path: Default::default(),
66 }
67 }
68
69 #[inline]
71 pub fn with_connection_path(mut self, path: impl Into<EPath>) -> Self {
72 self.connection_path = path.into();
73 self
74 }
75
76 #[inline]
78 pub fn connection_path(&self) -> &EPath {
79 &self.connection_path
80 }
81
82 #[inline]
84 pub fn remote_endpoint(&self) -> &B::Endpoint {
85 &self.addr
86 }
87
88 #[inline]
89 async fn ensure_service(&mut self) -> Result<()> {
90 if self.service.is_none() {
91 let service = B::build_service(self.addr.clone()).await?;
92 self.service = Some(service);
93 }
94 match self.service {
95 None => unreachable!(),
96 Some(ref mut service) => {
97 service.open().await?;
98 }
99 }
100 Ok(())
101 }
102}
103
104#[async_trait::async_trait]
105impl<B: Driver> Heartbeat for Client<B> {
106 type Error = ClientError;
107 #[inline]
109 async fn heartbeat(&mut self) -> Result<()> {
110 if let Some(ref mut service) = self.service {
111 service.heartbeat().await?;
112 }
113 Ok(())
114 }
115}
116
117#[async_trait::async_trait]
119impl<B: Driver> MessageService for Client<B> {
120 type Error = ClientError;
121
122 #[inline]
124 async fn send<'de, P, D, R>(&mut self, mr: MessageRequest<P, D>) -> Result<R>
125 where
126 P: Encode + Send + Sync,
127 D: Encode + Send + Sync,
128 R: MessageReplyInterface + Decode<'de> + 'static,
129 {
130 self.ensure_service().await?;
132 let service = self.service.as_mut().expect("expected service");
133 let req = UnconnectedSend::new(self.connection_path.clone(), mr);
134 let res = service.unconnected_send(req).await?;
135 Ok(res)
136 }
137
138 #[inline]
140 async fn close(&mut self) -> Result<()> {
141 if let Some(mut service) = self.service.take() {
142 let _ = service.close().await;
143 }
144 Ok(())
145 }
146
147 #[inline]
149 fn closed(&self) -> bool {
150 self.service.is_none()
151 }
152}
153
154#[derive(Debug)]
156pub struct Connection<B: Driver> {
157 addr: B::Endpoint,
158 origin_options: OpenOptions,
159 connected_options: Option<OpenOptions>,
160 service: Option<B::Service>,
162 seq_id: AtomicU16,
164}
165
166impl<B: Driver> Connection<B> {
167 #[inline]
169 pub fn new(addr: B::Endpoint, options: OpenOptions) -> Self {
170 Self {
171 addr,
172 origin_options: options,
173 connected_options: None,
174 service: None,
175 seq_id: Default::default(),
176 }
177 }
178
179 #[inline]
181 pub fn remote_endpoint(&self) -> &B::Endpoint {
182 &self.addr
183 }
184
185 #[inline]
187 pub fn connection_id(&self) -> Option<u32> {
188 self.connected_options.as_ref().map(|v| v.o_t_connection_id)
189 }
190
191 #[inline]
193 pub fn connected(&self) -> bool {
194 self.connection_id().is_some()
195 }
196
197 #[inline]
199 fn next_sequence_number(&mut self) -> u16 {
200 loop {
201 let v = self
202 .seq_id
203 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
204 if v > 0 {
206 return v;
207 }
208 }
209 }
210
211 #[inline]
213 pub async fn reconnect(&mut self) -> Result<()> {
214 self.close_connection().await?;
215 self.open_connection().await?;
216 Ok(())
217 }
218
219 #[inline]
220 async fn ensure_service(&mut self) -> Result<()> {
221 if self.service.is_none() {
222 let service = B::build_service(self.addr.clone()).await?;
223 self.service = Some(service);
224 }
225 match self.service {
226 None => unreachable!(),
227 Some(ref mut service) => {
228 service.open().await?;
229 }
230 }
231 Ok(())
232 }
233
234 #[inline]
236 async fn open_connection(&mut self) -> Result<u32> {
237 self.ensure_service().await?;
239 let service = self.service.as_mut().expect("expected service");
240 if self.connected_options.is_none() {
241 let reply = service.forward_open(self.origin_options.clone()).await?;
242 match reply.into_value() {
243 Either::Left(reply) => {
244 let opts = self
245 .origin_options
246 .clone()
247 .o_t_connection_id(reply.o_t_connection_id)
248 .connection_serial_number(reply.connection_serial_number)
249 .o_t_rpi(reply.o_t_api)
250 .t_o_rpi(reply.t_o_api);
251 self.connected_options = Some(opts);
252 }
253 Either::Right(_) => return Err(Error::custom("forward open failed")),
254 }
255 }
256 Ok(self.connection_id().unwrap())
257 }
258
259 #[inline]
261 async fn close_connection(&mut self) -> Result<()> {
262 if let Some(conn) = self.connected_options.take() {
263 if let Some(service) = self.service.as_mut() {
264 let request = ForwardCloseRequest {
265 priority_time_ticks: self.origin_options.priority_tick_time,
266 timeout_ticks: self.origin_options.timeout_ticks,
267 connection_serial_number: conn.connection_serial_number,
268 originator_serial_number: conn.originator_serial_number,
269 originator_vendor_id: conn.vendor_id,
270 connection_path: conn.connection_path,
271 };
272 let _ = service.forward_close(request).await;
273 }
274 }
275 Ok(())
276 }
277}
278
279#[async_trait::async_trait]
280impl<B: Driver> Heartbeat for Connection<B> {
281 type Error = ClientError;
282
283 #[inline]
285 async fn heartbeat(&mut self) -> Result<()> {
286 if let Some(ref mut service) = self.service {
287 service.heartbeat().await?;
288 }
290 Ok(())
291 }
292}
293
294#[async_trait::async_trait]
295impl<B: Driver> MessageService for Connection<B> {
296 type Error = ClientError;
297 #[inline]
299 async fn send<'de, P, D, R>(&mut self, mr: MessageRequest<P, D>) -> Result<R>
300 where
301 P: Encode + Send + Sync,
302 D: Encode + Send + Sync,
303 R: MessageReplyInterface + Decode<'de> + 'static,
304 {
305 let cid = self.open_connection().await?;
307 let sid = self.next_sequence_number();
308 let service = self.service.as_mut().expect("expected service");
309 service.connected_send(cid, sid, mr).await
310 }
311
312 #[inline]
314 async fn close(&mut self) -> Result<()> {
315 let _ = self.close_connection().await;
316 if let Some(mut service) = self.service.take() {
317 let _ = service.close().await;
318 }
319 Ok(())
320 }
321
322 #[inline]
324 fn closed(&self) -> bool {
325 self.connected_options.is_none()
326 }
327}
328
329#[derive(Debug)]
331pub struct MaybeConnected<B: Driver>(Either<Client<B>, Connection<B>>);
332
333impl<B: Driver> Deref for MaybeConnected<B> {
334 type Target = Either<Client<B>, Connection<B>>;
335 fn deref(&self) -> &Self::Target {
336 &self.0
337 }
338}
339
340impl<B: Driver> DerefMut for MaybeConnected<B> {
341 fn deref_mut(&mut self) -> &mut Self::Target {
342 &mut self.0
343 }
344}
345
346#[async_trait::async_trait]
347impl<B: Driver> Heartbeat for MaybeConnected<B> {
348 type Error = ClientError;
349 #[inline]
351 async fn heartbeat(&mut self) -> Result<()> {
352 match self.0 {
353 Either::Left(ref mut c) => c.heartbeat().await,
354 Either::Right(ref mut c) => c.heartbeat().await,
355 }
356 }
357}
358
359#[async_trait::async_trait]
360impl<B: Driver> MessageService for MaybeConnected<B> {
361 type Error = ClientError;
362 #[inline]
364 async fn send<'de, P, D, R>(&mut self, mr: MessageRequest<P, D>) -> Result<R>
365 where
366 P: Encode + Send + Sync,
367 D: Encode + Send + Sync,
368 R: MessageReplyInterface + Decode<'de> + 'static,
369 {
370 match self.0 {
371 Either::Left(ref mut c) => c.send(mr).await,
372 Either::Right(ref mut c) => c.send(mr).await,
373 }
374 }
375
376 #[inline]
378 async fn close(&mut self) -> Result<()> {
379 match self.0 {
380 Either::Left(ref mut c) => c.close().await,
381 Either::Right(ref mut c) => c.close().await,
382 }
383 }
384
385 #[inline]
387 fn closed(&self) -> bool {
388 match self.0 {
389 Either::Left(ref c) => c.closed(),
390 Either::Right(ref c) => c.closed(),
391 }
392 }
393}