1use super::EipClient;
2use crate::batch::{BatchError, BatchOperation, BatchResult};
3use crate::error::{EtherNetIpError, Result};
4use crate::monitoring::DiagnosticsSnapshot;
5use crate::route::RoutePath;
6use crate::types::PlcValue;
7use std::time::Duration;
8use tokio::sync::{broadcast, mpsc, oneshot};
9
10type BatchReadResults = Vec<(String, std::result::Result<PlcValue, BatchError>)>;
11type BatchWriteResults = Vec<(String, std::result::Result<(), BatchError>)>;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum ConnectionEvent {
15 Connected,
16 Disconnected,
17 WorkerStopped,
18}
19
20#[derive(Debug, Clone)]
21pub struct Client {
22 tx: mpsc::Sender<ClientCommand>,
23 events: broadcast::Sender<ConnectionEvent>,
24}
25
26#[derive(Debug, Clone)]
27pub enum Backoff {
28 Constant(Duration),
29 Exponential {
30 initial: Duration,
31 max: Duration,
32 factor: u32,
33 },
34}
35
36#[derive(Debug, Clone)]
37pub struct RetryPolicy {
38 pub max_attempts: usize,
39 pub backoff: Backoff,
40 pub retry_writes: bool,
41}
42
43impl RetryPolicy {
44 pub fn constant(max_attempts: usize, delay: Duration) -> Self {
45 Self {
46 max_attempts: max_attempts.max(1),
47 backoff: Backoff::Constant(delay),
48 retry_writes: false,
49 }
50 }
51
52 pub fn exponential(max_attempts: usize, initial: Duration, max: Duration) -> Self {
53 Self {
54 max_attempts: max_attempts.max(1),
55 backoff: Backoff::Exponential {
56 initial,
57 max,
58 factor: 2,
59 },
60 retry_writes: false,
61 }
62 }
63
64 pub fn retry_writes(mut self, retry_writes: bool) -> Self {
65 self.retry_writes = retry_writes;
66 self
67 }
68
69 fn delay_for_attempt(&self, attempt_index: usize) -> Duration {
70 match self.backoff {
71 Backoff::Constant(delay) => delay,
72 Backoff::Exponential {
73 initial,
74 max,
75 factor,
76 } => {
77 let multiplier = factor.saturating_pow(attempt_index as u32);
78 initial.saturating_mul(multiplier).min(max)
79 }
80 }
81 }
82}
83
84#[derive(Clone)]
85pub struct RetryClient {
86 client: Client,
87 policy: RetryPolicy,
88}
89
90enum ClientCommand {
91 ReadTag {
92 tag_name: String,
93 reply: oneshot::Sender<Result<PlcValue>>,
94 },
95 WriteTag {
96 tag_name: String,
97 value: PlcValue,
98 reply: oneshot::Sender<Result<()>>,
99 },
100 WriteStringTag {
101 tag_name: String,
102 value: String,
103 reply: oneshot::Sender<Result<()>>,
104 },
105 WriteUdtMember {
106 tag_name: String,
107 member_name: String,
108 value: PlcValue,
109 reply: oneshot::Sender<Result<()>>,
110 },
111 ExecuteBatch {
112 operations: Vec<BatchOperation>,
113 reply: oneshot::Sender<Result<Vec<BatchResult>>>,
114 },
115 ReadTagsBatch {
116 tag_names: Vec<String>,
117 reply: oneshot::Sender<Result<BatchReadResults>>,
118 },
119 WriteTagsBatch {
120 tag_values: Vec<(String, PlcValue)>,
121 reply: oneshot::Sender<Result<BatchWriteResults>>,
122 },
123 CheckHealth {
124 reply: oneshot::Sender<bool>,
125 },
126 Diagnostics {
127 verified: bool,
128 reply: oneshot::Sender<Result<DiagnosticsSnapshot>>,
129 },
130}
131
132impl Client {
133 pub async fn connect(addr: &str) -> Result<Self> {
134 Self::from_eip_client(EipClient::connect(addr).await?)
135 }
136
137 pub async fn with_route_path(addr: &str, route: RoutePath) -> Result<Self> {
138 Self::from_eip_client(EipClient::with_route_path(addr, route).await?)
139 }
140
141 pub fn from_eip_client(client: EipClient) -> Result<Self> {
142 let (tx, rx) = mpsc::channel(128);
143 let (events, _) = broadcast::channel(128);
144 let actor = Self {
145 tx,
146 events: events.clone(),
147 };
148
149 tokio::spawn(run_client_actor(client, rx, events));
150 Ok(actor)
151 }
152
153 pub fn events(&self) -> broadcast::Receiver<ConnectionEvent> {
154 let rx = self.events.subscribe();
155 let _ = self.events.send(ConnectionEvent::Connected);
156 rx
157 }
158
159 pub fn with_retry(&self, policy: RetryPolicy) -> RetryClient {
160 RetryClient {
161 client: self.clone(),
162 policy,
163 }
164 }
165
166 pub async fn read_tag(&self, tag_name: &str) -> Result<PlcValue> {
167 let (reply, rx) = oneshot::channel();
168 self.send(ClientCommand::ReadTag {
169 tag_name: tag_name.to_string(),
170 reply,
171 })
172 .await?;
173 rx.await.unwrap_or_else(|_| actor_stopped())
174 }
175
176 pub async fn write_tag(&self, tag_name: &str, value: PlcValue) -> Result<()> {
177 let (reply, rx) = oneshot::channel();
178 self.send(ClientCommand::WriteTag {
179 tag_name: tag_name.to_string(),
180 value,
181 reply,
182 })
183 .await?;
184 rx.await.unwrap_or_else(|_| actor_stopped())
185 }
186
187 pub async fn write_string_tag(&self, tag_name: &str, value: &str) -> Result<()> {
188 let (reply, rx) = oneshot::channel();
189 self.send(ClientCommand::WriteStringTag {
190 tag_name: tag_name.to_string(),
191 value: value.to_string(),
192 reply,
193 })
194 .await?;
195 rx.await.unwrap_or_else(|_| actor_stopped())
196 }
197
198 pub async fn write_udt_member(
199 &self,
200 udt_tag_name: &str,
201 member_name: &str,
202 value: PlcValue,
203 ) -> Result<()> {
204 let (reply, rx) = oneshot::channel();
205 self.send(ClientCommand::WriteUdtMember {
206 tag_name: udt_tag_name.to_string(),
207 member_name: member_name.to_string(),
208 value,
209 reply,
210 })
211 .await?;
212 rx.await.unwrap_or_else(|_| actor_stopped())
213 }
214
215 pub async fn write_udt_array_member(
216 &self,
217 udt_array_element_path: &str,
218 member_name: &str,
219 value: PlcValue,
220 ) -> Result<()> {
221 self.write_udt_member(udt_array_element_path, member_name, value)
222 .await
223 }
224
225 pub async fn execute_batch(&self, operations: &[BatchOperation]) -> Result<Vec<BatchResult>> {
226 let (reply, rx) = oneshot::channel();
227 self.send(ClientCommand::ExecuteBatch {
228 operations: operations.to_vec(),
229 reply,
230 })
231 .await?;
232 rx.await.unwrap_or_else(|_| actor_stopped())
233 }
234
235 pub async fn read_tags_batch(&self, tag_names: &[&str]) -> Result<BatchReadResults> {
236 let (reply, rx) = oneshot::channel();
237 self.send(ClientCommand::ReadTagsBatch {
238 tag_names: tag_names.iter().map(|name| (*name).to_string()).collect(),
239 reply,
240 })
241 .await?;
242 rx.await.unwrap_or_else(|_| actor_stopped())
243 }
244
245 pub async fn write_tags_batch(
246 &self,
247 tag_values: &[(&str, PlcValue)],
248 ) -> Result<BatchWriteResults> {
249 let (reply, rx) = oneshot::channel();
250 self.send(ClientCommand::WriteTagsBatch {
251 tag_values: tag_values
252 .iter()
253 .map(|(name, value)| ((*name).to_string(), value.clone()))
254 .collect(),
255 reply,
256 })
257 .await?;
258 rx.await.unwrap_or_else(|_| actor_stopped())
259 }
260
261 pub async fn check_health(&self) -> Result<bool> {
262 let (reply, rx) = oneshot::channel();
263 self.send(ClientCommand::CheckHealth { reply }).await?;
264 rx.await.map_err(|_| actor_stopped_error())
265 }
266
267 pub async fn get_diagnostics_snapshot(&self) -> Result<DiagnosticsSnapshot> {
268 self.diagnostics(false).await
269 }
270
271 pub async fn get_diagnostics_snapshot_detailed(&self) -> Result<DiagnosticsSnapshot> {
272 self.diagnostics(true).await
273 }
274
275 async fn diagnostics(&self, verified: bool) -> Result<DiagnosticsSnapshot> {
276 let (reply, rx) = oneshot::channel();
277 self.send(ClientCommand::Diagnostics { verified, reply })
278 .await?;
279 rx.await.unwrap_or_else(|_| actor_stopped())
280 }
281
282 async fn send(&self, command: ClientCommand) -> Result<()> {
283 self.tx
284 .send(command)
285 .await
286 .map_err(|_| actor_stopped_error())
287 }
288}
289
290impl RetryClient {
291 pub async fn read_tag(&self, tag_name: &str) -> Result<PlcValue> {
292 self.retry(|| async { self.client.read_tag(tag_name).await })
293 .await
294 }
295
296 pub async fn write_tag(&self, tag_name: &str, value: PlcValue) -> Result<()> {
297 if !self.policy.retry_writes {
298 return self.client.write_tag(tag_name, value).await;
299 }
300
301 self.retry(|| {
302 let value = value.clone();
303 async move { self.client.write_tag(tag_name, value).await }
304 })
305 .await
306 }
307
308 pub async fn write_string_tag(&self, tag_name: &str, value: &str) -> Result<()> {
309 if !self.policy.retry_writes {
310 return self.client.write_string_tag(tag_name, value).await;
311 }
312
313 self.retry(|| async { self.client.write_string_tag(tag_name, value).await })
314 .await
315 }
316
317 async fn retry<T, Fut, Op>(&self, mut op: Op) -> Result<T>
318 where
319 Fut: std::future::Future<Output = Result<T>>,
320 Op: FnMut() -> Fut,
321 {
322 let mut attempt = 0;
323 loop {
324 match op().await {
325 Ok(value) => return Ok(value),
326 Err(err) if err.is_retriable() && attempt + 1 < self.policy.max_attempts => {
327 let delay = self.policy.delay_for_attempt(attempt);
328 attempt += 1;
329 tokio::time::sleep(delay).await;
330 }
331 Err(err) => return Err(err),
332 }
333 }
334 }
335}
336
337async fn run_client_actor(
338 mut client: EipClient,
339 mut rx: mpsc::Receiver<ClientCommand>,
340 events: broadcast::Sender<ConnectionEvent>,
341) {
342 let _ = events.send(ConnectionEvent::Connected);
343
344 while let Some(command) = rx.recv().await {
345 match command {
346 ClientCommand::ReadTag { tag_name, reply } => {
347 let _ = reply.send(client.read_tag(&tag_name).await);
348 }
349 ClientCommand::WriteTag {
350 tag_name,
351 value,
352 reply,
353 } => {
354 let _ = reply.send(client.write_tag(&tag_name, value).await);
355 }
356 ClientCommand::WriteStringTag {
357 tag_name,
358 value,
359 reply,
360 } => {
361 let _ = reply.send(client.write_string_tag(&tag_name, &value).await);
362 }
363 ClientCommand::WriteUdtMember {
364 tag_name,
365 member_name,
366 value,
367 reply,
368 } => {
369 let _ = reply.send(
370 client
371 .write_udt_member(&tag_name, &member_name, value)
372 .await,
373 );
374 }
375 ClientCommand::ExecuteBatch { operations, reply } => {
376 let _ = reply.send(client.execute_batch(&operations).await);
377 }
378 ClientCommand::ReadTagsBatch { tag_names, reply } => {
379 let refs: Vec<&str> = tag_names.iter().map(String::as_str).collect();
380 let _ = reply.send(client.read_tags_batch(&refs).await);
381 }
382 ClientCommand::WriteTagsBatch { tag_values, reply } => {
383 let refs: Vec<(&str, PlcValue)> = tag_values
384 .iter()
385 .map(|(name, value)| (name.as_str(), value.clone()))
386 .collect();
387 let _ = reply.send(client.write_tags_batch(&refs).await);
388 }
389 ClientCommand::CheckHealth { reply } => {
390 let _ = reply.send(client.check_health().await);
391 }
392 ClientCommand::Diagnostics { verified, reply } => {
393 let result = if verified {
394 client.get_diagnostics_snapshot_detailed().await
395 } else {
396 Ok(client.get_diagnostics_snapshot().await)
397 };
398 let _ = reply.send(result);
399 }
400 }
401 }
402
403 let _ = client.unregister_session().await;
404 let _ = events.send(ConnectionEvent::Disconnected);
405 let _ = events.send(ConnectionEvent::WorkerStopped);
406}
407
408fn actor_stopped<T>() -> Result<T> {
409 Err(actor_stopped_error())
410}
411
412fn actor_stopped_error() -> EtherNetIpError {
413 EtherNetIpError::ConnectionLost("client actor stopped".to_string())
414}