rust_ethernet_ip/
tag_subscription.rs1use crate::error::{EtherNetIpError, Result};
2use crate::PlcValue;
3use std::sync::atomic::AtomicBool;
4use std::sync::Arc;
5use tokio::sync::{mpsc, Mutex};
6
7#[derive(Debug, Clone)]
9pub struct SubscriptionOptions {
10 pub update_rate: u32,
12 pub change_threshold: f32,
14 pub timeout: u32,
16}
17
18impl Default for SubscriptionOptions {
19 fn default() -> Self {
20 Self {
21 update_rate: 100, change_threshold: 0.001, timeout: 5000, }
25 }
26}
27
28#[derive(Debug, Clone)]
30pub struct TagSubscription {
31 pub tag_path: String,
33 pub options: SubscriptionOptions,
35 pub last_value: Arc<Mutex<Option<PlcValue>>>,
37 pub sender: Arc<Mutex<mpsc::Sender<PlcValue>>>,
39 pub receiver: Arc<Mutex<mpsc::Receiver<PlcValue>>>,
41 pub is_active: Arc<AtomicBool>,
43}
44
45impl TagSubscription {
46 pub fn new(tag_name: String, options: SubscriptionOptions) -> Self {
48 let (sender, receiver) = mpsc::channel(100); Self {
50 tag_path: tag_name,
51 options,
52 last_value: Arc::new(Mutex::new(None)),
53 sender: Arc::new(Mutex::new(sender)),
54 receiver: Arc::new(Mutex::new(receiver)),
55 is_active: Arc::new(AtomicBool::new(true)),
56 }
57 }
58
59 pub fn is_active(&self) -> bool {
61 self.is_active.load(std::sync::atomic::Ordering::Relaxed)
62 }
63
64 pub fn stop(&self) {
66 self.is_active
67 .store(false, std::sync::atomic::Ordering::Relaxed);
68 }
69
70 pub async fn update_value(&self, value: &PlcValue) -> Result<()> {
72 let mut last_value = self.last_value.lock().await;
73
74 if let Some(old) = last_value.as_ref() {
76 if !Self::value_changed(old, value, self.options.change_threshold) {
77 return Ok(());
78 }
79 }
80
81 *last_value = Some(value.clone());
83 let sender = self.sender.lock().await;
84 sender
85 .send(value.clone())
86 .await
87 .map_err(|e| EtherNetIpError::Subscription(format!("Failed to send update: {e}")))?;
88
89 Ok(())
90 }
91
92 fn value_changed(old: &PlcValue, new: &PlcValue, threshold: f32) -> bool {
96 match (old, new) {
97 (PlcValue::Real(o), PlcValue::Real(n)) => (*n - *o).abs() >= threshold,
98 (PlcValue::Lreal(o), PlcValue::Lreal(n)) => (*n - *o).abs() >= threshold as f64,
99 (PlcValue::Bool(o), PlcValue::Bool(n)) => o != n,
100 (PlcValue::Sint(o), PlcValue::Sint(n)) => o != n,
101 (PlcValue::Int(o), PlcValue::Int(n)) => o != n,
102 (PlcValue::Dint(o), PlcValue::Dint(n)) => o != n,
103 (PlcValue::Lint(o), PlcValue::Lint(n)) => o != n,
104 (PlcValue::Usint(o), PlcValue::Usint(n)) => o != n,
105 (PlcValue::Uint(o), PlcValue::Uint(n)) => o != n,
106 (PlcValue::Udint(o), PlcValue::Udint(n)) => o != n,
107 (PlcValue::Ulint(o), PlcValue::Ulint(n)) => o != n,
108 (PlcValue::String(o), PlcValue::String(n)) => o != n,
109 _ => true,
111 }
112 }
113
114 pub async fn wait_for_update(&self) -> Result<PlcValue> {
116 let mut receiver = self.receiver.lock().await;
117 receiver
118 .recv()
119 .await
120 .ok_or_else(|| EtherNetIpError::Subscription("Channel closed".to_string()))
121 }
122
123 pub async fn get_last_value(&self) -> Option<PlcValue> {
125 self.last_value.lock().await.clone()
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct SubscriptionManager {
132 subscriptions: Arc<Mutex<Vec<TagSubscription>>>,
133}
134
135impl Default for SubscriptionManager {
136 fn default() -> Self {
137 Self::new()
138 }
139}
140
141impl SubscriptionManager {
142 pub fn new() -> Self {
144 Self {
145 subscriptions: Arc::new(Mutex::new(Vec::new())),
146 }
147 }
148
149 pub async fn add_subscription(&self, subscription: TagSubscription) {
151 let mut subscriptions = self.subscriptions.lock().await;
152 subscriptions.push(subscription);
153 }
154
155 pub async fn remove_subscription(&self, tag_name: &str) {
157 let mut subscriptions = self.subscriptions.lock().await;
158 subscriptions.retain(|sub| sub.tag_path != tag_name);
159 }
160
161 pub async fn update_value(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
163 let subscriptions = self.subscriptions.lock().await;
164 for subscription in subscriptions.iter() {
165 if subscription.tag_path == tag_name && subscription.is_active() {
166 subscription.update_value(value).await?;
167 }
168 }
169 Ok(())
170 }
171
172 pub async fn get_subscriptions(&self) -> Vec<TagSubscription> {
174 let subscriptions = self.subscriptions.lock().await;
175 subscriptions.clone()
176 }
177
178 pub async fn get_subscription(&self, tag_name: &str) -> Option<TagSubscription> {
180 let subscriptions = self.subscriptions.lock().await;
181 subscriptions
182 .iter()
183 .find(|sub| sub.tag_path == tag_name)
184 .cloned()
185 }
186}