rust_ethernet_ip/
subscription.rs1use crate::error::{EtherNetIpError, Result};
2use crate::PlcValue;
3use std::sync::atomic::AtomicBool;
4use std::sync::Arc;
5use tokio::sync::{mpsc, Mutex};
6
7use async_stream::stream;
8use futures::Stream;
9
10#[derive(Debug, Clone)]
12pub struct SubscriptionOptions {
13 pub update_rate: u32,
15 pub change_threshold: f32,
17 pub timeout: u32,
19}
20
21impl Default for SubscriptionOptions {
22 fn default() -> Self {
23 Self {
24 update_rate: 100, change_threshold: 0.001, timeout: 5000, }
28 }
29}
30
31#[derive(Debug, Clone)]
33pub struct TagSubscription {
34 pub tag_path: String,
36 pub options: SubscriptionOptions,
38 pub last_value: Arc<Mutex<Option<PlcValue>>>,
40 pub sender: Arc<Mutex<mpsc::Sender<PlcValue>>>,
42 pub receiver: Arc<Mutex<mpsc::Receiver<PlcValue>>>,
44 pub is_active: Arc<AtomicBool>,
46}
47
48impl TagSubscription {
49 pub fn new(tag_name: String, options: SubscriptionOptions) -> Self {
51 let (sender, receiver) = mpsc::channel(100); Self {
53 tag_path: tag_name,
54 options,
55 last_value: Arc::new(Mutex::new(None)),
56 sender: Arc::new(Mutex::new(sender)),
57 receiver: Arc::new(Mutex::new(receiver)),
58 is_active: Arc::new(AtomicBool::new(true)),
59 }
60 }
61
62 pub fn is_active(&self) -> bool {
64 self.is_active.load(std::sync::atomic::Ordering::Relaxed)
65 }
66
67 pub fn stop(&self) {
69 self.is_active
70 .store(false, std::sync::atomic::Ordering::Relaxed);
71 }
72
73 pub async fn update_value(&self, value: &PlcValue) -> Result<()> {
75 let mut last_value = self.last_value.lock().await;
76
77 if let Some(old) = last_value.as_ref() {
79 if !Self::value_changed(old, value, self.options.change_threshold) {
80 return Ok(());
81 }
82 }
83
84 *last_value = Some(value.clone());
86 let sender = self.sender.lock().await;
87 sender
88 .send(value.clone())
89 .await
90 .map_err(|e| EtherNetIpError::Subscription(format!("Failed to send update: {e}")))?;
91
92 Ok(())
93 }
94
95 fn value_changed(old: &PlcValue, new: &PlcValue, threshold: f32) -> bool {
99 match (old, new) {
100 (PlcValue::Real(o), PlcValue::Real(n)) => (*n - *o).abs() >= threshold,
101 (PlcValue::Lreal(o), PlcValue::Lreal(n)) => (*n - *o).abs() >= threshold as f64,
102 (PlcValue::Bool(o), PlcValue::Bool(n)) => o != n,
103 (PlcValue::Sint(o), PlcValue::Sint(n)) => o != n,
104 (PlcValue::Int(o), PlcValue::Int(n)) => o != n,
105 (PlcValue::Dint(o), PlcValue::Dint(n)) => o != n,
106 (PlcValue::Lint(o), PlcValue::Lint(n)) => o != n,
107 (PlcValue::Usint(o), PlcValue::Usint(n)) => o != n,
108 (PlcValue::Uint(o), PlcValue::Uint(n)) => o != n,
109 (PlcValue::Udint(o), PlcValue::Udint(n)) => o != n,
110 (PlcValue::Ulint(o), PlcValue::Ulint(n)) => o != n,
111 (PlcValue::String(o), PlcValue::String(n)) => o != n,
112 _ => true,
114 }
115 }
116
117 pub async fn wait_for_update(&self) -> Result<PlcValue> {
119 let mut receiver = self.receiver.lock().await;
120 receiver
121 .recv()
122 .await
123 .ok_or_else(|| EtherNetIpError::Subscription("Channel closed".to_string()))
124 }
125
126 pub async fn get_last_value(&self) -> Option<PlcValue> {
128 self.last_value.lock().await.clone()
129 }
130
131 pub fn into_stream(self: Arc<Self>) -> impl Stream<Item = PlcValue> + Send {
148 stream! {
149 loop {
150 let v = {
151 let mut receiver = self.receiver.lock().await;
152 receiver.recv().await
153 };
154 match v {
155 Some(plc_value) => yield plc_value,
156 None => break,
157 }
158 }
159 }
160 }
161}
162
163#[derive(Debug, Clone)]
165pub struct SubscriptionManager {
166 subscriptions: Arc<Mutex<Vec<TagSubscription>>>,
167}
168
169impl Default for SubscriptionManager {
170 fn default() -> Self {
171 Self::new()
172 }
173}
174
175impl SubscriptionManager {
176 pub fn new() -> Self {
178 Self {
179 subscriptions: Arc::new(Mutex::new(Vec::new())),
180 }
181 }
182
183 pub async fn add_subscription(&self, subscription: TagSubscription) {
185 let mut subscriptions = self.subscriptions.lock().await;
186 subscriptions.push(subscription);
187 }
188
189 pub async fn remove_subscription(&self, tag_name: &str) {
191 let mut subscriptions = self.subscriptions.lock().await;
192 subscriptions.retain(|sub| sub.tag_path != tag_name);
193 }
194
195 pub async fn update_value(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
197 let subscriptions = self.subscriptions.lock().await;
198 for subscription in subscriptions.iter() {
199 if subscription.tag_path == tag_name && subscription.is_active() {
200 subscription.update_value(value).await?;
201 }
202 }
203 Ok(())
204 }
205
206 pub async fn get_subscriptions(&self) -> Vec<TagSubscription> {
208 let subscriptions = self.subscriptions.lock().await;
209 subscriptions.clone()
210 }
211
212 pub async fn get_subscription(&self, tag_name: &str) -> Option<TagSubscription> {
214 let subscriptions = self.subscriptions.lock().await;
215 subscriptions
216 .iter()
217 .find(|sub| sub.tag_path == tag_name)
218 .cloned()
219 }
220}