rust_ethernet_ip/client/
subscriptions.rs1use super::EipClient;
2use crate::error::{EtherNetIpError, Result};
3use crate::subscription::{SubscriptionOptions, TagSubscription};
4use crate::tag_group::{
5 TagGroupConfig, TagGroupEvent, TagGroupEventKind, TagGroupFailureDiagnostic, TagGroupSnapshot,
6 TagGroupSubscription, TagGroupValueResult,
7};
8use crate::types::PlcValue;
9
10impl EipClient {
11 pub async fn subscribe_to_tag(
23 &self,
24 tag_path: &str,
25 options: SubscriptionOptions,
26 ) -> Result<TagSubscription> {
27 let subscription = TagSubscription::new(tag_path.to_string(), options.clone());
28 let mut validation_client = self.clone();
29 let initial_value = validation_client.read_tag(tag_path).await?;
30 subscription.update_value(&initial_value).await?;
31
32 let mut subscriptions = self.subscriptions.lock().await;
33 let update_rate_ms = options.update_rate;
34 subscriptions.push(subscription.clone());
35 drop(subscriptions);
36
37 let tag_path = tag_path.to_string();
38 let mut client = self.clone();
39 tokio::spawn(async move {
40 let interval = std::time::Duration::from_millis(update_rate_ms as u64);
41 loop {
42 match client.read_tag(&tag_path).await {
43 Ok(value) => {
44 if let Err(e) = client.update_subscription(&tag_path, &value).await {
45 tracing::error!("Error updating subscription: {}", e);
46 break;
47 }
48 }
49 Err(e) => {
50 tracing::error!("Error reading tag {}: {}", tag_path, e);
51 break;
52 }
53 }
54 tokio::time::sleep(interval).await;
55 }
56 });
57 Ok(subscription)
58 }
59
60 pub async fn subscribe_to_tags(
62 &self,
63 tags: &[(&str, SubscriptionOptions)],
64 ) -> Result<Vec<TagSubscription>> {
65 let mut subs = Vec::with_capacity(tags.len());
66 for (tag_name, options) in tags {
67 subs.push(self.subscribe_to_tag(tag_name, options.clone()).await?);
68 }
69 Ok(subs)
70 }
71
72 pub async fn upsert_tag_group(
77 &self,
78 group_name: &str,
79 tags: &[&str],
80 update_rate_ms: u32,
81 ) -> Result<()> {
82 if group_name.trim().is_empty() {
83 return Err(EtherNetIpError::Protocol(
84 "Tag group name cannot be empty".to_string(),
85 ));
86 }
87 if tags.is_empty() {
88 return Err(EtherNetIpError::Protocol(
89 "Tag group must contain at least one tag".to_string(),
90 ));
91 }
92 if update_rate_ms == 0 {
93 return Err(EtherNetIpError::Protocol(
94 "Tag group update rate must be greater than 0 ms".to_string(),
95 ));
96 }
97
98 let config = TagGroupConfig {
99 name: group_name.to_string(),
100 tags: tags.iter().map(|s| (*s).to_string()).collect(),
101 update_rate_ms,
102 };
103
104 let mut groups = self.tag_groups.lock().await;
105 groups.insert(group_name.to_string(), config);
106 Ok(())
107 }
108
109 pub async fn remove_tag_group(&self, group_name: &str) -> bool {
111 let mut groups = self.tag_groups.lock().await;
112 groups.remove(group_name).is_some()
113 }
114
115 pub async fn list_tag_groups(&self) -> Vec<TagGroupConfig> {
117 let groups = self.tag_groups.lock().await;
118 groups.values().cloned().collect()
119 }
120
121 pub async fn read_tag_group_once(&self, group_name: &str) -> Result<TagGroupSnapshot> {
123 let config = {
124 let groups = self.tag_groups.lock().await;
125 groups.get(group_name).cloned().ok_or_else(|| {
126 EtherNetIpError::Protocol(format!("Tag group '{}' is not registered", group_name))
127 })?
128 };
129
130 let mut client = self.clone();
131 let tag_refs: Vec<&str> = config.tags.iter().map(String::as_str).collect();
132 let values = client.read_tags_batch(&tag_refs).await?;
133
134 let mapped = values
135 .into_iter()
136 .map(|(tag_name, result)| match result {
137 Ok(value) => TagGroupValueResult {
138 tag_name,
139 value: Some(value),
140 error: None,
141 },
142 Err(e) => TagGroupValueResult {
143 tag_name,
144 value: None,
145 error: Some(e.to_string()),
146 },
147 })
148 .collect();
149
150 Ok(TagGroupSnapshot {
151 group_name: config.name,
152 sampled_at: std::time::SystemTime::now(),
153 values: mapped,
154 })
155 }
156
157 pub async fn subscribe_tag_group(&self, group_name: &str) -> Result<TagGroupSubscription> {
161 let config = {
162 let groups = self.tag_groups.lock().await;
163 groups.get(group_name).cloned().ok_or_else(|| {
164 EtherNetIpError::Protocol(format!("Tag group '{}' is not registered", group_name))
165 })?
166 };
167
168 let subscription = TagGroupSubscription::new(config.name.clone(), config.update_rate_ms);
169 let subscription_task = subscription.clone();
170 let mut client = self.clone();
171 let tags = config.tags.clone();
172 let interval = std::time::Duration::from_millis(config.update_rate_ms as u64);
173 let group_name_owned = config.name.clone();
174
175 tokio::spawn(async move {
176 while subscription_task.is_active() {
177 let tag_refs: Vec<&str> = tags.iter().map(String::as_str).collect();
178 match client.read_tags_batch(&tag_refs).await {
179 Ok(values) => {
180 let has_errors = values.iter().any(|(_, result)| result.is_err());
181 let snapshot = TagGroupSnapshot {
182 group_name: group_name_owned.clone(),
183 sampled_at: std::time::SystemTime::now(),
184 values: values
185 .into_iter()
186 .map(|(tag_name, result)| match result {
187 Ok(value) => TagGroupValueResult {
188 tag_name,
189 value: Some(value),
190 error: None,
191 },
192 Err(e) => TagGroupValueResult {
193 tag_name,
194 value: None,
195 error: Some(e.to_string()),
196 },
197 })
198 .collect(),
199 };
200
201 let event = TagGroupEvent {
202 kind: if has_errors {
203 TagGroupEventKind::PartialError
204 } else {
205 TagGroupEventKind::Data
206 },
207 snapshot,
208 error: None,
209 failure: None,
210 };
211
212 if let Err(e) = subscription_task.publish_event(event).await {
213 tracing::error!(
214 "Tag group '{}' publish failed: {}",
215 group_name_owned,
216 e
217 );
218 break;
219 }
220 }
221 Err(e) => {
222 tracing::error!(
223 "Tag group '{}' polling read failed: {}",
224 group_name_owned,
225 e
226 );
227 let failure_event = TagGroupEvent {
228 kind: TagGroupEventKind::ReadFailure,
229 snapshot: TagGroupSnapshot {
230 group_name: group_name_owned.clone(),
231 sampled_at: std::time::SystemTime::now(),
232 values: Vec::new(),
233 },
234 error: Some(e.to_string()),
235 failure: Some(TagGroupFailureDiagnostic::from_error(&e)),
236 };
237 if let Err(publish_error) =
238 subscription_task.publish_event(failure_event).await
239 {
240 tracing::error!(
241 "Tag group '{}' failure-event publish failed: {}",
242 group_name_owned,
243 publish_error
244 );
245 break;
246 }
247 }
248 }
249 tokio::time::sleep(interval).await;
250 }
251 });
252
253 Ok(subscription)
254 }
255
256 async fn update_subscription(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
257 let subscriptions = {
258 let subscriptions = self.subscriptions.lock().await;
259 subscriptions.clone()
260 };
261 for subscription in &subscriptions {
262 if subscription.tag_path == tag_name && subscription.is_active() {
263 subscription.update_value(value).await?;
264 }
265 }
266 Ok(())
267 }
268}