Skip to main content

rust_ethernet_ip/client/
actor.rs

1use super::EipClient;
2use crate::batch::{BatchError, BatchOperation, BatchResult};
3use crate::error::{EtherNetIpError, Result};
4use crate::monitoring::DiagnosticsSnapshot;
5use crate::route::RoutePath;
6use crate::types::PlcValue;
7use std::time::Duration;
8use tokio::sync::{broadcast, mpsc, oneshot};
9
10type BatchReadResults = Vec<(String, std::result::Result<PlcValue, BatchError>)>;
11type BatchWriteResults = Vec<(String, std::result::Result<(), BatchError>)>;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum ConnectionEvent {
15    Connected,
16    Disconnected,
17    WorkerStopped,
18}
19
20#[derive(Debug, Clone)]
21pub struct Client {
22    tx: mpsc::Sender<ClientCommand>,
23    events: broadcast::Sender<ConnectionEvent>,
24}
25
26#[derive(Debug, Clone)]
27pub enum Backoff {
28    Constant(Duration),
29    Exponential {
30        initial: Duration,
31        max: Duration,
32        factor: u32,
33    },
34}
35
36#[derive(Debug, Clone)]
37pub struct RetryPolicy {
38    pub max_attempts: usize,
39    pub backoff: Backoff,
40    pub retry_writes: bool,
41}
42
43impl RetryPolicy {
44    pub fn constant(max_attempts: usize, delay: Duration) -> Self {
45        Self {
46            max_attempts: max_attempts.max(1),
47            backoff: Backoff::Constant(delay),
48            retry_writes: false,
49        }
50    }
51
52    pub fn exponential(max_attempts: usize, initial: Duration, max: Duration) -> Self {
53        Self {
54            max_attempts: max_attempts.max(1),
55            backoff: Backoff::Exponential {
56                initial,
57                max,
58                factor: 2,
59            },
60            retry_writes: false,
61        }
62    }
63
64    pub fn retry_writes(mut self, retry_writes: bool) -> Self {
65        self.retry_writes = retry_writes;
66        self
67    }
68
69    fn delay_for_attempt(&self, attempt_index: usize) -> Duration {
70        match self.backoff {
71            Backoff::Constant(delay) => delay,
72            Backoff::Exponential {
73                initial,
74                max,
75                factor,
76            } => {
77                let multiplier = factor.saturating_pow(attempt_index as u32);
78                initial.saturating_mul(multiplier).min(max)
79            }
80        }
81    }
82}
83
84#[derive(Clone)]
85pub struct RetryClient {
86    client: Client,
87    policy: RetryPolicy,
88}
89
90enum ClientCommand {
91    ReadTag {
92        tag_name: String,
93        reply: oneshot::Sender<Result<PlcValue>>,
94    },
95    WriteTag {
96        tag_name: String,
97        value: PlcValue,
98        reply: oneshot::Sender<Result<()>>,
99    },
100    WriteStringTag {
101        tag_name: String,
102        value: String,
103        reply: oneshot::Sender<Result<()>>,
104    },
105    WriteUdtMember {
106        tag_name: String,
107        member_name: String,
108        value: PlcValue,
109        reply: oneshot::Sender<Result<()>>,
110    },
111    ExecuteBatch {
112        operations: Vec<BatchOperation>,
113        reply: oneshot::Sender<Result<Vec<BatchResult>>>,
114    },
115    ReadTagsBatch {
116        tag_names: Vec<String>,
117        reply: oneshot::Sender<Result<BatchReadResults>>,
118    },
119    WriteTagsBatch {
120        tag_values: Vec<(String, PlcValue)>,
121        reply: oneshot::Sender<Result<BatchWriteResults>>,
122    },
123    CheckHealth {
124        reply: oneshot::Sender<bool>,
125    },
126    Diagnostics {
127        verified: bool,
128        reply: oneshot::Sender<Result<DiagnosticsSnapshot>>,
129    },
130}
131
132impl Client {
133    pub async fn connect(addr: &str) -> Result<Self> {
134        Self::from_eip_client(EipClient::connect(addr).await?)
135    }
136
137    pub async fn with_route_path(addr: &str, route: RoutePath) -> Result<Self> {
138        Self::from_eip_client(EipClient::with_route_path(addr, route).await?)
139    }
140
141    pub fn from_eip_client(client: EipClient) -> Result<Self> {
142        let (tx, rx) = mpsc::channel(128);
143        let (events, _) = broadcast::channel(128);
144        let actor = Self {
145            tx,
146            events: events.clone(),
147        };
148
149        tokio::spawn(run_client_actor(client, rx, events));
150        Ok(actor)
151    }
152
153    pub fn events(&self) -> broadcast::Receiver<ConnectionEvent> {
154        let rx = self.events.subscribe();
155        let _ = self.events.send(ConnectionEvent::Connected);
156        rx
157    }
158
159    pub fn with_retry(&self, policy: RetryPolicy) -> RetryClient {
160        RetryClient {
161            client: self.clone(),
162            policy,
163        }
164    }
165
166    pub async fn read_tag(&self, tag_name: &str) -> Result<PlcValue> {
167        let (reply, rx) = oneshot::channel();
168        self.send(ClientCommand::ReadTag {
169            tag_name: tag_name.to_string(),
170            reply,
171        })
172        .await?;
173        rx.await.unwrap_or_else(|_| actor_stopped())
174    }
175
176    pub async fn write_tag(&self, tag_name: &str, value: PlcValue) -> Result<()> {
177        let (reply, rx) = oneshot::channel();
178        self.send(ClientCommand::WriteTag {
179            tag_name: tag_name.to_string(),
180            value,
181            reply,
182        })
183        .await?;
184        rx.await.unwrap_or_else(|_| actor_stopped())
185    }
186
187    pub async fn write_string_tag(&self, tag_name: &str, value: &str) -> Result<()> {
188        let (reply, rx) = oneshot::channel();
189        self.send(ClientCommand::WriteStringTag {
190            tag_name: tag_name.to_string(),
191            value: value.to_string(),
192            reply,
193        })
194        .await?;
195        rx.await.unwrap_or_else(|_| actor_stopped())
196    }
197
198    pub async fn write_udt_member(
199        &self,
200        udt_tag_name: &str,
201        member_name: &str,
202        value: PlcValue,
203    ) -> Result<()> {
204        let (reply, rx) = oneshot::channel();
205        self.send(ClientCommand::WriteUdtMember {
206            tag_name: udt_tag_name.to_string(),
207            member_name: member_name.to_string(),
208            value,
209            reply,
210        })
211        .await?;
212        rx.await.unwrap_or_else(|_| actor_stopped())
213    }
214
215    pub async fn write_udt_array_member(
216        &self,
217        udt_array_element_path: &str,
218        member_name: &str,
219        value: PlcValue,
220    ) -> Result<()> {
221        self.write_udt_member(udt_array_element_path, member_name, value)
222            .await
223    }
224
225    pub async fn execute_batch(&self, operations: &[BatchOperation]) -> Result<Vec<BatchResult>> {
226        let (reply, rx) = oneshot::channel();
227        self.send(ClientCommand::ExecuteBatch {
228            operations: operations.to_vec(),
229            reply,
230        })
231        .await?;
232        rx.await.unwrap_or_else(|_| actor_stopped())
233    }
234
235    pub async fn read_tags_batch(&self, tag_names: &[&str]) -> Result<BatchReadResults> {
236        let (reply, rx) = oneshot::channel();
237        self.send(ClientCommand::ReadTagsBatch {
238            tag_names: tag_names.iter().map(|name| (*name).to_string()).collect(),
239            reply,
240        })
241        .await?;
242        rx.await.unwrap_or_else(|_| actor_stopped())
243    }
244
245    pub async fn write_tags_batch(
246        &self,
247        tag_values: &[(&str, PlcValue)],
248    ) -> Result<BatchWriteResults> {
249        let (reply, rx) = oneshot::channel();
250        self.send(ClientCommand::WriteTagsBatch {
251            tag_values: tag_values
252                .iter()
253                .map(|(name, value)| ((*name).to_string(), value.clone()))
254                .collect(),
255            reply,
256        })
257        .await?;
258        rx.await.unwrap_or_else(|_| actor_stopped())
259    }
260
261    pub async fn check_health(&self) -> Result<bool> {
262        let (reply, rx) = oneshot::channel();
263        self.send(ClientCommand::CheckHealth { reply }).await?;
264        rx.await.map_err(|_| actor_stopped_error())
265    }
266
267    pub async fn get_diagnostics_snapshot(&self) -> Result<DiagnosticsSnapshot> {
268        self.diagnostics(false).await
269    }
270
271    pub async fn get_diagnostics_snapshot_detailed(&self) -> Result<DiagnosticsSnapshot> {
272        self.diagnostics(true).await
273    }
274
275    async fn diagnostics(&self, verified: bool) -> Result<DiagnosticsSnapshot> {
276        let (reply, rx) = oneshot::channel();
277        self.send(ClientCommand::Diagnostics { verified, reply })
278            .await?;
279        rx.await.unwrap_or_else(|_| actor_stopped())
280    }
281
282    async fn send(&self, command: ClientCommand) -> Result<()> {
283        self.tx
284            .send(command)
285            .await
286            .map_err(|_| actor_stopped_error())
287    }
288}
289
290impl RetryClient {
291    pub async fn read_tag(&self, tag_name: &str) -> Result<PlcValue> {
292        self.retry(|| async { self.client.read_tag(tag_name).await })
293            .await
294    }
295
296    pub async fn write_tag(&self, tag_name: &str, value: PlcValue) -> Result<()> {
297        if !self.policy.retry_writes {
298            return self.client.write_tag(tag_name, value).await;
299        }
300
301        self.retry(|| {
302            let value = value.clone();
303            async move { self.client.write_tag(tag_name, value).await }
304        })
305        .await
306    }
307
308    pub async fn write_string_tag(&self, tag_name: &str, value: &str) -> Result<()> {
309        if !self.policy.retry_writes {
310            return self.client.write_string_tag(tag_name, value).await;
311        }
312
313        self.retry(|| async { self.client.write_string_tag(tag_name, value).await })
314            .await
315    }
316
317    async fn retry<T, Fut, Op>(&self, mut op: Op) -> Result<T>
318    where
319        Fut: std::future::Future<Output = Result<T>>,
320        Op: FnMut() -> Fut,
321    {
322        let mut attempt = 0;
323        loop {
324            match op().await {
325                Ok(value) => return Ok(value),
326                Err(err) if err.is_retriable() && attempt + 1 < self.policy.max_attempts => {
327                    let delay = self.policy.delay_for_attempt(attempt);
328                    attempt += 1;
329                    tokio::time::sleep(delay).await;
330                }
331                Err(err) => return Err(err),
332            }
333        }
334    }
335}
336
337async fn run_client_actor(
338    mut client: EipClient,
339    mut rx: mpsc::Receiver<ClientCommand>,
340    events: broadcast::Sender<ConnectionEvent>,
341) {
342    let _ = events.send(ConnectionEvent::Connected);
343
344    while let Some(command) = rx.recv().await {
345        match command {
346            ClientCommand::ReadTag { tag_name, reply } => {
347                let _ = reply.send(client.read_tag(&tag_name).await);
348            }
349            ClientCommand::WriteTag {
350                tag_name,
351                value,
352                reply,
353            } => {
354                let _ = reply.send(client.write_tag(&tag_name, value).await);
355            }
356            ClientCommand::WriteStringTag {
357                tag_name,
358                value,
359                reply,
360            } => {
361                let _ = reply.send(client.write_string_tag(&tag_name, &value).await);
362            }
363            ClientCommand::WriteUdtMember {
364                tag_name,
365                member_name,
366                value,
367                reply,
368            } => {
369                let _ = reply.send(
370                    client
371                        .write_udt_member(&tag_name, &member_name, value)
372                        .await,
373                );
374            }
375            ClientCommand::ExecuteBatch { operations, reply } => {
376                let _ = reply.send(client.execute_batch(&operations).await);
377            }
378            ClientCommand::ReadTagsBatch { tag_names, reply } => {
379                let refs: Vec<&str> = tag_names.iter().map(String::as_str).collect();
380                let _ = reply.send(client.read_tags_batch(&refs).await);
381            }
382            ClientCommand::WriteTagsBatch { tag_values, reply } => {
383                let refs: Vec<(&str, PlcValue)> = tag_values
384                    .iter()
385                    .map(|(name, value)| (name.as_str(), value.clone()))
386                    .collect();
387                let _ = reply.send(client.write_tags_batch(&refs).await);
388            }
389            ClientCommand::CheckHealth { reply } => {
390                let _ = reply.send(client.check_health().await);
391            }
392            ClientCommand::Diagnostics { verified, reply } => {
393                let result = if verified {
394                    client.get_diagnostics_snapshot_detailed().await
395                } else {
396                    Ok(client.get_diagnostics_snapshot().await)
397                };
398                let _ = reply.send(result);
399            }
400        }
401    }
402
403    let _ = client.unregister_session().await;
404    let _ = events.send(ConnectionEvent::Disconnected);
405    let _ = events.send(ConnectionEvent::WorkerStopped);
406}
407
408fn actor_stopped<T>() -> Result<T> {
409    Err(actor_stopped_error())
410}
411
412fn actor_stopped_error() -> EtherNetIpError {
413    EtherNetIpError::ConnectionLost("client actor stopped".to_string())
414}