Skip to main content

rust_ethernet_ip/
tag_group.rs

1use crate::PlcValue;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::Arc;
4use std::time::SystemTime;
5use tokio::sync::{mpsc, Mutex};
6
7/// Defines a named tag group with a polling interval.
8#[derive(Debug, Clone)]
9pub struct TagGroupConfig {
10    pub name: String,
11    pub tags: Vec<String>,
12    pub update_rate_ms: u32,
13}
14
15/// Per-tag result in a group snapshot.
16#[derive(Debug, Clone)]
17pub struct TagGroupValueResult {
18    pub tag_name: String,
19    pub value: Option<PlcValue>,
20    pub error: Option<String>,
21}
22
23/// Snapshot of one polling cycle for a group.
24#[derive(Debug, Clone)]
25pub struct TagGroupSnapshot {
26    pub group_name: String,
27    pub sampled_at: SystemTime,
28    pub values: Vec<TagGroupValueResult>,
29}
30
31/// High-level classification for tag-group polling events.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum TagGroupEventKind {
34    Data,
35    PartialError,
36    ReadFailure,
37}
38
39/// Structured category for tag-group polling failures.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum TagGroupFailureCategory {
42    Network,
43    Timeout,
44    PlcStatus,
45    Protocol,
46    Permission,
47    Tag,
48    Data,
49    Other,
50}
51
52/// Structured diagnostics for read failures during tag-group polling.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct TagGroupFailureDiagnostic {
55    pub category: TagGroupFailureCategory,
56    pub retriable: bool,
57    pub status_code: Option<u8>,
58}
59
60impl TagGroupFailureDiagnostic {
61    pub fn from_error(error: &crate::EtherNetIpError) -> Self {
62        use crate::EtherNetIpError;
63
64        let (category, status_code) = match error {
65            EtherNetIpError::Timeout(_) => (TagGroupFailureCategory::Timeout, None),
66            EtherNetIpError::Io(_)
67            | EtherNetIpError::Connection(_)
68            | EtherNetIpError::ConnectionLost(_) => (TagGroupFailureCategory::Network, None),
69            EtherNetIpError::CipError { code, .. } => {
70                (TagGroupFailureCategory::PlcStatus, Some(*code))
71            }
72            EtherNetIpError::ReadError { status, .. }
73            | EtherNetIpError::WriteError { status, .. }
74            | EtherNetIpError::StringReadError { status, .. }
75            | EtherNetIpError::StringWriteError { status, .. } => {
76                (TagGroupFailureCategory::PlcStatus, Some(*status))
77            }
78            EtherNetIpError::Permission(_) => (TagGroupFailureCategory::Permission, None),
79            EtherNetIpError::TagNotFound(_) | EtherNetIpError::Tag(_) => {
80                (TagGroupFailureCategory::Tag, None)
81            }
82            EtherNetIpError::DataTypeMismatch { .. }
83            | EtherNetIpError::Udt(_)
84            | EtherNetIpError::StringTooLong { .. }
85            | EtherNetIpError::InvalidString { .. } => (TagGroupFailureCategory::Data, None),
86            EtherNetIpError::Protocol(_)
87            | EtherNetIpError::InvalidResponse { .. }
88            | EtherNetIpError::InvalidStringResponse { .. }
89            | EtherNetIpError::Subscription(_)
90            | EtherNetIpError::Utf8(_) => (TagGroupFailureCategory::Protocol, None),
91            EtherNetIpError::Other(_) => (TagGroupFailureCategory::Other, None),
92        };
93
94        Self {
95            category,
96            retriable: error.is_retriable(),
97            status_code,
98        }
99    }
100}
101
102/// Event emitted by background tag-group polling.
103#[derive(Debug, Clone)]
104pub struct TagGroupEvent {
105    pub kind: TagGroupEventKind,
106    pub snapshot: TagGroupSnapshot,
107    pub error: Option<String>,
108    pub failure: Option<TagGroupFailureDiagnostic>,
109}
110
111/// Live subscription to a tag group polling stream.
112#[derive(Debug, Clone)]
113pub struct TagGroupSubscription {
114    pub group_name: String,
115    pub update_rate_ms: u32,
116    is_active: Arc<AtomicBool>,
117    sender: Arc<Mutex<mpsc::Sender<TagGroupEvent>>>,
118    receiver: Arc<Mutex<mpsc::Receiver<TagGroupEvent>>>,
119}
120
121impl TagGroupSubscription {
122    pub fn new(group_name: String, update_rate_ms: u32) -> Self {
123        let (sender, receiver) = mpsc::channel(64);
124        Self {
125            group_name,
126            update_rate_ms,
127            is_active: Arc::new(AtomicBool::new(true)),
128            sender: Arc::new(Mutex::new(sender)),
129            receiver: Arc::new(Mutex::new(receiver)),
130        }
131    }
132
133    pub fn is_active(&self) -> bool {
134        self.is_active.load(Ordering::Relaxed)
135    }
136
137    pub fn stop(&self) {
138        self.is_active.store(false, Ordering::Relaxed);
139    }
140
141    pub async fn publish(&self, snapshot: TagGroupSnapshot) -> Result<(), String> {
142        let event = TagGroupEvent {
143            kind: if snapshot.values.iter().any(|v| v.error.is_some()) {
144                TagGroupEventKind::PartialError
145            } else {
146                TagGroupEventKind::Data
147            },
148            snapshot,
149            error: None,
150            failure: None,
151        };
152        self.publish_event(event).await
153    }
154
155    pub async fn publish_event(&self, event: TagGroupEvent) -> Result<(), String> {
156        let sender = self.sender.lock().await;
157        sender.send(event).await.map_err(|e| e.to_string())
158    }
159
160    pub async fn wait_for_update(&self) -> Option<TagGroupEvent> {
161        let mut receiver = self.receiver.lock().await;
162        receiver.recv().await
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use crate::EtherNetIpError;
170
171    #[test]
172    fn maps_cip_status_failure_diagnostic() {
173        let diagnostic = TagGroupFailureDiagnostic::from_error(&EtherNetIpError::CipError {
174            code: 0x05,
175            message: "Path destination unknown".to_string(),
176        });
177        assert_eq!(diagnostic.category, TagGroupFailureCategory::PlcStatus);
178        assert_eq!(diagnostic.status_code, Some(0x05));
179        assert!(!diagnostic.retriable);
180    }
181
182    #[test]
183    fn maps_timeout_failure_diagnostic_as_retriable() {
184        let diagnostic = TagGroupFailureDiagnostic::from_error(&EtherNetIpError::Timeout(
185            std::time::Duration::from_secs(2),
186        ));
187        assert_eq!(diagnostic.category, TagGroupFailureCategory::Timeout);
188        assert_eq!(diagnostic.status_code, None);
189        assert!(diagnostic.retriable);
190    }
191
192    #[tokio::test]
193    async fn publish_assigns_partial_error_kind() {
194        let sub = TagGroupSubscription::new("group".to_string(), 100);
195        let snapshot = TagGroupSnapshot {
196            group_name: "group".to_string(),
197            sampled_at: SystemTime::now(),
198            values: vec![TagGroupValueResult {
199                tag_name: "Tag1".to_string(),
200                value: None,
201                error: Some("Read failed".to_string()),
202            }],
203        };
204
205        sub.publish(snapshot).await.expect("publish should succeed");
206        let event = sub.wait_for_update().await.expect("event should exist");
207
208        assert_eq!(event.kind, TagGroupEventKind::PartialError);
209        assert!(event.error.is_none());
210        assert!(event.failure.is_none());
211    }
212
213    #[tokio::test]
214    async fn publish_assigns_data_kind_when_all_values_are_ok() {
215        let sub = TagGroupSubscription::new("group".to_string(), 100);
216        let snapshot = TagGroupSnapshot {
217            group_name: "group".to_string(),
218            sampled_at: SystemTime::now(),
219            values: vec![TagGroupValueResult {
220                tag_name: "Tag1".to_string(),
221                value: Some(crate::PlcValue::Dint(42)),
222                error: None,
223            }],
224        };
225
226        sub.publish(snapshot).await.expect("publish should succeed");
227        let event = sub.wait_for_update().await.expect("event should exist");
228
229        assert_eq!(event.kind, TagGroupEventKind::Data);
230        assert!(event.error.is_none());
231        assert!(event.failure.is_none());
232    }
233
234    #[tokio::test]
235    async fn publish_event_preserves_read_failure_diagnostics() {
236        let sub = TagGroupSubscription::new("group".to_string(), 100);
237        let event = TagGroupEvent {
238            kind: TagGroupEventKind::ReadFailure,
239            snapshot: TagGroupSnapshot {
240                group_name: "group".to_string(),
241                sampled_at: SystemTime::now(),
242                values: Vec::new(),
243            },
244            error: Some("timeout while reading tag group".to_string()),
245            failure: Some(TagGroupFailureDiagnostic {
246                category: TagGroupFailureCategory::Timeout,
247                retriable: true,
248                status_code: None,
249            }),
250        };
251
252        sub.publish_event(event.clone())
253            .await
254            .expect("publish_event should succeed");
255        let received = sub.wait_for_update().await.expect("event should exist");
256
257        assert_eq!(received.kind, TagGroupEventKind::ReadFailure);
258        assert_eq!(received.error, event.error);
259        assert_eq!(received.failure, event.failure);
260    }
261
262    #[test]
263    fn stop_marks_subscription_inactive() {
264        let sub = TagGroupSubscription::new("group".to_string(), 100);
265        assert!(sub.is_active());
266        sub.stop();
267        assert!(!sub.is_active());
268    }
269}