Skip to main content

rust_ethernet_ip/client/
subscriptions.rs

1use super::EipClient;
2use crate::error::{EtherNetIpError, Result};
3use crate::subscription::{SubscriptionOptions, TagSubscription};
4use crate::tag_group::{
5    TagGroupConfig, TagGroupEvent, TagGroupEventKind, TagGroupFailureDiagnostic, TagGroupSnapshot,
6    TagGroupSubscription, TagGroupValueResult,
7};
8use crate::types::PlcValue;
9
10impl EipClient {
11    /// Subscribes to a tag for real-time updates.
12    ///
13    /// The returned [`TagSubscription`] can be used to:
14    /// - [`wait_for_update()`](TagSubscription::wait_for_update) for the next value
15    /// - [`get_last_value()`](TagSubscription::get_last_value) for the latest cached value
16    /// - [`into_stream()`](TagSubscription::into_stream) for an async `Stream` of updates
17    ///
18    /// This API validates the tag with an initial read before returning so invalid or
19    /// inaccessible tags fail fast instead of surfacing only through background polling logs.
20    ///
21    /// The background poll loop uses [`SubscriptionOptions::update_rate`] (milliseconds) between reads.
22    pub async fn subscribe_to_tag(
23        &self,
24        tag_path: &str,
25        options: SubscriptionOptions,
26    ) -> Result<TagSubscription> {
27        let subscription = TagSubscription::new(tag_path.to_string(), options.clone());
28        let mut validation_client = self.clone();
29        let initial_value = validation_client.read_tag(tag_path).await?;
30        subscription.update_value(&initial_value).await?;
31
32        let mut subscriptions = self.subscriptions.lock().await;
33        let update_rate_ms = options.update_rate;
34        subscriptions.push(subscription.clone());
35        drop(subscriptions);
36
37        let tag_path = tag_path.to_string();
38        let mut client = self.clone();
39        tokio::spawn(async move {
40            let interval = std::time::Duration::from_millis(update_rate_ms as u64);
41            loop {
42                match client.read_tag(&tag_path).await {
43                    Ok(value) => {
44                        if let Err(e) = client.update_subscription(&tag_path, &value).await {
45                            tracing::error!("Error updating subscription: {}", e);
46                            break;
47                        }
48                    }
49                    Err(e) => {
50                        tracing::error!("Error reading tag {}: {}", tag_path, e);
51                        break;
52                    }
53                }
54                tokio::time::sleep(interval).await;
55            }
56        });
57        Ok(subscription)
58    }
59
60    /// Subscribes to multiple tags. Returns one [`TagSubscription`] per tag in order.
61    pub async fn subscribe_to_tags(
62        &self,
63        tags: &[(&str, SubscriptionOptions)],
64    ) -> Result<Vec<TagSubscription>> {
65        let mut subs = Vec::with_capacity(tags.len());
66        for (tag_name, options) in tags {
67            subs.push(self.subscribe_to_tag(tag_name, options.clone()).await?);
68        }
69        Ok(subs)
70    }
71
72    /// Registers or replaces a named tag group for grouped polling.
73    ///
74    /// Tag groups are useful for HMI/SCADA-style scan classes where multiple tags
75    /// share a polling interval and should be read together.
76    pub async fn upsert_tag_group(
77        &self,
78        group_name: &str,
79        tags: &[&str],
80        update_rate_ms: u32,
81    ) -> Result<()> {
82        if group_name.trim().is_empty() {
83            return Err(EtherNetIpError::Protocol(
84                "Tag group name cannot be empty".to_string(),
85            ));
86        }
87        if tags.is_empty() {
88            return Err(EtherNetIpError::Protocol(
89                "Tag group must contain at least one tag".to_string(),
90            ));
91        }
92        if update_rate_ms == 0 {
93            return Err(EtherNetIpError::Protocol(
94                "Tag group update rate must be greater than 0 ms".to_string(),
95            ));
96        }
97
98        let config = TagGroupConfig {
99            name: group_name.to_string(),
100            tags: tags.iter().map(|s| (*s).to_string()).collect(),
101            update_rate_ms,
102        };
103
104        let mut groups = self.tag_groups.lock().await;
105        groups.insert(group_name.to_string(), config);
106        Ok(())
107    }
108
109    /// Removes a named tag group.
110    pub async fn remove_tag_group(&self, group_name: &str) -> bool {
111        let mut groups = self.tag_groups.lock().await;
112        groups.remove(group_name).is_some()
113    }
114
115    /// Lists all currently registered tag groups.
116    pub async fn list_tag_groups(&self) -> Vec<TagGroupConfig> {
117        let groups = self.tag_groups.lock().await;
118        groups.values().cloned().collect()
119    }
120
121    /// Reads all tags in a group once and returns a snapshot.
122    pub async fn read_tag_group_once(&self, group_name: &str) -> Result<TagGroupSnapshot> {
123        let config = {
124            let groups = self.tag_groups.lock().await;
125            groups.get(group_name).cloned().ok_or_else(|| {
126                EtherNetIpError::Protocol(format!("Tag group '{}' is not registered", group_name))
127            })?
128        };
129
130        let mut client = self.clone();
131        let tag_refs: Vec<&str> = config.tags.iter().map(String::as_str).collect();
132        let values = client.read_tags_batch(&tag_refs).await?;
133
134        let mapped = values
135            .into_iter()
136            .map(|(tag_name, result)| match result {
137                Ok(value) => TagGroupValueResult {
138                    tag_name,
139                    value: Some(value),
140                    error: None,
141                },
142                Err(e) => TagGroupValueResult {
143                    tag_name,
144                    value: None,
145                    error: Some(e.to_string()),
146                },
147            })
148            .collect();
149
150        Ok(TagGroupSnapshot {
151            group_name: config.name,
152            sampled_at: std::time::SystemTime::now(),
153            values: mapped,
154        })
155    }
156
157    /// Starts background polling for a registered tag group.
158    ///
159    /// Use the returned subscription to await snapshots and to stop polling.
160    pub async fn subscribe_tag_group(&self, group_name: &str) -> Result<TagGroupSubscription> {
161        let config = {
162            let groups = self.tag_groups.lock().await;
163            groups.get(group_name).cloned().ok_or_else(|| {
164                EtherNetIpError::Protocol(format!("Tag group '{}' is not registered", group_name))
165            })?
166        };
167
168        let subscription = TagGroupSubscription::new(config.name.clone(), config.update_rate_ms);
169        let subscription_task = subscription.clone();
170        let mut client = self.clone();
171        let tags = config.tags.clone();
172        let interval = std::time::Duration::from_millis(config.update_rate_ms as u64);
173        let group_name_owned = config.name.clone();
174
175        tokio::spawn(async move {
176            while subscription_task.is_active() {
177                let tag_refs: Vec<&str> = tags.iter().map(String::as_str).collect();
178                match client.read_tags_batch(&tag_refs).await {
179                    Ok(values) => {
180                        let has_errors = values.iter().any(|(_, result)| result.is_err());
181                        let snapshot = TagGroupSnapshot {
182                            group_name: group_name_owned.clone(),
183                            sampled_at: std::time::SystemTime::now(),
184                            values: values
185                                .into_iter()
186                                .map(|(tag_name, result)| match result {
187                                    Ok(value) => TagGroupValueResult {
188                                        tag_name,
189                                        value: Some(value),
190                                        error: None,
191                                    },
192                                    Err(e) => TagGroupValueResult {
193                                        tag_name,
194                                        value: None,
195                                        error: Some(e.to_string()),
196                                    },
197                                })
198                                .collect(),
199                        };
200
201                        let event = TagGroupEvent {
202                            kind: if has_errors {
203                                TagGroupEventKind::PartialError
204                            } else {
205                                TagGroupEventKind::Data
206                            },
207                            snapshot,
208                            error: None,
209                            failure: None,
210                        };
211
212                        if let Err(e) = subscription_task.publish_event(event).await {
213                            tracing::error!(
214                                "Tag group '{}' publish failed: {}",
215                                group_name_owned,
216                                e
217                            );
218                            break;
219                        }
220                    }
221                    Err(e) => {
222                        tracing::error!(
223                            "Tag group '{}' polling read failed: {}",
224                            group_name_owned,
225                            e
226                        );
227                        let failure_event = TagGroupEvent {
228                            kind: TagGroupEventKind::ReadFailure,
229                            snapshot: TagGroupSnapshot {
230                                group_name: group_name_owned.clone(),
231                                sampled_at: std::time::SystemTime::now(),
232                                values: Vec::new(),
233                            },
234                            error: Some(e.to_string()),
235                            failure: Some(TagGroupFailureDiagnostic::from_error(&e)),
236                        };
237                        if let Err(publish_error) =
238                            subscription_task.publish_event(failure_event).await
239                        {
240                            tracing::error!(
241                                "Tag group '{}' failure-event publish failed: {}",
242                                group_name_owned,
243                                publish_error
244                            );
245                            break;
246                        }
247                    }
248                }
249                tokio::time::sleep(interval).await;
250            }
251        });
252
253        Ok(subscription)
254    }
255
256    async fn update_subscription(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
257        let subscriptions = {
258            let subscriptions = self.subscriptions.lock().await;
259            subscriptions.clone()
260        };
261        for subscription in &subscriptions {
262            if subscription.tag_path == tag_name && subscription.is_active() {
263                subscription.update_value(value).await?;
264            }
265        }
266        Ok(())
267    }
268}