1use crate::PlcValue;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::Arc;
4use std::time::SystemTime;
5use tokio::sync::{mpsc, Mutex};
6
7#[derive(Debug, Clone)]
9pub struct TagGroupConfig {
10 pub name: String,
11 pub tags: Vec<String>,
12 pub update_rate_ms: u32,
13}
14
15#[derive(Debug, Clone)]
17pub struct TagGroupValueResult {
18 pub tag_name: String,
19 pub value: Option<PlcValue>,
20 pub error: Option<String>,
21}
22
23#[derive(Debug, Clone)]
25pub struct TagGroupSnapshot {
26 pub group_name: String,
27 pub sampled_at: SystemTime,
28 pub values: Vec<TagGroupValueResult>,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum TagGroupEventKind {
34 Data,
35 PartialError,
36 ReadFailure,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum TagGroupFailureCategory {
42 Network,
43 Timeout,
44 PlcStatus,
45 Protocol,
46 Permission,
47 Tag,
48 Data,
49 Other,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct TagGroupFailureDiagnostic {
55 pub category: TagGroupFailureCategory,
56 pub retriable: bool,
57 pub status_code: Option<u8>,
58}
59
60impl TagGroupFailureDiagnostic {
61 pub fn from_error(error: &crate::EtherNetIpError) -> Self {
62 use crate::EtherNetIpError;
63
64 let (category, status_code) = match error {
65 EtherNetIpError::Timeout(_) => (TagGroupFailureCategory::Timeout, None),
66 EtherNetIpError::Io(_)
67 | EtherNetIpError::Connection(_)
68 | EtherNetIpError::ConnectionLost(_) => (TagGroupFailureCategory::Network, None),
69 EtherNetIpError::CipError { code, .. } => {
70 (TagGroupFailureCategory::PlcStatus, Some(*code))
71 }
72 EtherNetIpError::ReadError { status, .. }
73 | EtherNetIpError::WriteError { status, .. }
74 | EtherNetIpError::StringReadError { status, .. }
75 | EtherNetIpError::StringWriteError { status, .. } => {
76 (TagGroupFailureCategory::PlcStatus, Some(*status))
77 }
78 EtherNetIpError::Permission(_) => (TagGroupFailureCategory::Permission, None),
79 EtherNetIpError::TagNotFound(_) | EtherNetIpError::Tag(_) => {
80 (TagGroupFailureCategory::Tag, None)
81 }
82 EtherNetIpError::DataTypeMismatch { .. }
83 | EtherNetIpError::Udt(_)
84 | EtherNetIpError::StringTooLong { .. }
85 | EtherNetIpError::InvalidString { .. } => (TagGroupFailureCategory::Data, None),
86 EtherNetIpError::Protocol(_)
87 | EtherNetIpError::InvalidResponse { .. }
88 | EtherNetIpError::InvalidStringResponse { .. }
89 | EtherNetIpError::Subscription(_)
90 | EtherNetIpError::Utf8(_) => (TagGroupFailureCategory::Protocol, None),
91 EtherNetIpError::Other(_) => (TagGroupFailureCategory::Other, None),
92 };
93
94 Self {
95 category,
96 retriable: error.is_retriable(),
97 status_code,
98 }
99 }
100}
101
102#[derive(Debug, Clone)]
104pub struct TagGroupEvent {
105 pub kind: TagGroupEventKind,
106 pub snapshot: TagGroupSnapshot,
107 pub error: Option<String>,
108 pub failure: Option<TagGroupFailureDiagnostic>,
109}
110
111#[derive(Debug, Clone)]
113pub struct TagGroupSubscription {
114 pub group_name: String,
115 pub update_rate_ms: u32,
116 is_active: Arc<AtomicBool>,
117 sender: Arc<Mutex<mpsc::Sender<TagGroupEvent>>>,
118 receiver: Arc<Mutex<mpsc::Receiver<TagGroupEvent>>>,
119}
120
121impl TagGroupSubscription {
122 pub fn new(group_name: String, update_rate_ms: u32) -> Self {
123 let (sender, receiver) = mpsc::channel(64);
124 Self {
125 group_name,
126 update_rate_ms,
127 is_active: Arc::new(AtomicBool::new(true)),
128 sender: Arc::new(Mutex::new(sender)),
129 receiver: Arc::new(Mutex::new(receiver)),
130 }
131 }
132
133 pub fn is_active(&self) -> bool {
134 self.is_active.load(Ordering::Relaxed)
135 }
136
137 pub fn stop(&self) {
138 self.is_active.store(false, Ordering::Relaxed);
139 }
140
141 pub async fn publish(&self, snapshot: TagGroupSnapshot) -> Result<(), String> {
142 let event = TagGroupEvent {
143 kind: if snapshot.values.iter().any(|v| v.error.is_some()) {
144 TagGroupEventKind::PartialError
145 } else {
146 TagGroupEventKind::Data
147 },
148 snapshot,
149 error: None,
150 failure: None,
151 };
152 self.publish_event(event).await
153 }
154
155 pub async fn publish_event(&self, event: TagGroupEvent) -> Result<(), String> {
156 let sender = self.sender.lock().await;
157 sender.send(event).await.map_err(|e| e.to_string())
158 }
159
160 pub async fn wait_for_update(&self) -> Option<TagGroupEvent> {
161 let mut receiver = self.receiver.lock().await;
162 receiver.recv().await
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use crate::EtherNetIpError;
170
171 #[test]
172 fn maps_cip_status_failure_diagnostic() {
173 let diagnostic = TagGroupFailureDiagnostic::from_error(&EtherNetIpError::CipError {
174 code: 0x05,
175 message: "Path destination unknown".to_string(),
176 });
177 assert_eq!(diagnostic.category, TagGroupFailureCategory::PlcStatus);
178 assert_eq!(diagnostic.status_code, Some(0x05));
179 assert!(!diagnostic.retriable);
180 }
181
182 #[test]
183 fn maps_timeout_failure_diagnostic_as_retriable() {
184 let diagnostic = TagGroupFailureDiagnostic::from_error(&EtherNetIpError::Timeout(
185 std::time::Duration::from_secs(2),
186 ));
187 assert_eq!(diagnostic.category, TagGroupFailureCategory::Timeout);
188 assert_eq!(diagnostic.status_code, None);
189 assert!(diagnostic.retriable);
190 }
191
192 #[tokio::test]
193 async fn publish_assigns_partial_error_kind() {
194 let sub = TagGroupSubscription::new("group".to_string(), 100);
195 let snapshot = TagGroupSnapshot {
196 group_name: "group".to_string(),
197 sampled_at: SystemTime::now(),
198 values: vec![TagGroupValueResult {
199 tag_name: "Tag1".to_string(),
200 value: None,
201 error: Some("Read failed".to_string()),
202 }],
203 };
204
205 sub.publish(snapshot).await.expect("publish should succeed");
206 let event = sub.wait_for_update().await.expect("event should exist");
207
208 assert_eq!(event.kind, TagGroupEventKind::PartialError);
209 assert!(event.error.is_none());
210 assert!(event.failure.is_none());
211 }
212
213 #[tokio::test]
214 async fn publish_assigns_data_kind_when_all_values_are_ok() {
215 let sub = TagGroupSubscription::new("group".to_string(), 100);
216 let snapshot = TagGroupSnapshot {
217 group_name: "group".to_string(),
218 sampled_at: SystemTime::now(),
219 values: vec![TagGroupValueResult {
220 tag_name: "Tag1".to_string(),
221 value: Some(crate::PlcValue::Dint(42)),
222 error: None,
223 }],
224 };
225
226 sub.publish(snapshot).await.expect("publish should succeed");
227 let event = sub.wait_for_update().await.expect("event should exist");
228
229 assert_eq!(event.kind, TagGroupEventKind::Data);
230 assert!(event.error.is_none());
231 assert!(event.failure.is_none());
232 }
233
234 #[tokio::test]
235 async fn publish_event_preserves_read_failure_diagnostics() {
236 let sub = TagGroupSubscription::new("group".to_string(), 100);
237 let event = TagGroupEvent {
238 kind: TagGroupEventKind::ReadFailure,
239 snapshot: TagGroupSnapshot {
240 group_name: "group".to_string(),
241 sampled_at: SystemTime::now(),
242 values: Vec::new(),
243 },
244 error: Some("timeout while reading tag group".to_string()),
245 failure: Some(TagGroupFailureDiagnostic {
246 category: TagGroupFailureCategory::Timeout,
247 retriable: true,
248 status_code: None,
249 }),
250 };
251
252 sub.publish_event(event.clone())
253 .await
254 .expect("publish_event should succeed");
255 let received = sub.wait_for_update().await.expect("event should exist");
256
257 assert_eq!(received.kind, TagGroupEventKind::ReadFailure);
258 assert_eq!(received.error, event.error);
259 assert_eq!(received.failure, event.failure);
260 }
261
262 #[test]
263 fn stop_marks_subscription_inactive() {
264 let sub = TagGroupSubscription::new("group".to_string(), 100);
265 assert!(sub.is_active());
266 sub.stop();
267 assert!(!sub.is_active());
268 }
269}