rust_ethernet_ip/
subscription.rs1use crate::PlcValue;
2use crate::error::{EtherNetIpError, Result};
3use std::sync::Arc;
4use std::sync::atomic::AtomicBool;
5use tokio::sync::{Mutex, mpsc};
6
7use futures::{Stream, stream};
8
9#[derive(Debug, Clone)]
11pub struct SubscriptionOptions {
12 pub update_rate: u32,
14 pub change_threshold: f32,
16 pub timeout: u32,
18}
19
20impl Default for SubscriptionOptions {
21 fn default() -> Self {
22 Self {
23 update_rate: 100, change_threshold: 0.001, timeout: 5000, }
27 }
28}
29
30#[derive(Debug, Clone)]
32pub struct TagSubscription {
33 pub tag_path: String,
35 pub options: SubscriptionOptions,
37 pub last_value: Arc<Mutex<Option<PlcValue>>>,
39 pub sender: Arc<Mutex<mpsc::Sender<PlcValue>>>,
41 pub receiver: Arc<Mutex<mpsc::Receiver<PlcValue>>>,
43 pub is_active: Arc<AtomicBool>,
45}
46
47impl TagSubscription {
48 pub fn new(tag_name: String, options: SubscriptionOptions) -> Self {
50 let (sender, receiver) = mpsc::channel(100); Self {
52 tag_path: tag_name,
53 options,
54 last_value: Arc::new(Mutex::new(None)),
55 sender: Arc::new(Mutex::new(sender)),
56 receiver: Arc::new(Mutex::new(receiver)),
57 is_active: Arc::new(AtomicBool::new(true)),
58 }
59 }
60
61 pub fn is_active(&self) -> bool {
63 self.is_active.load(std::sync::atomic::Ordering::Relaxed)
64 }
65
66 pub fn stop(&self) {
68 self.is_active
69 .store(false, std::sync::atomic::Ordering::Relaxed);
70 }
71
72 pub async fn update_value(&self, value: &PlcValue) -> Result<()> {
74 let mut last_value = self.last_value.lock().await;
75
76 if let Some(old) = last_value.as_ref()
78 && !Self::value_changed(old, value, self.options.change_threshold)
79 {
80 return Ok(());
81 }
82
83 *last_value = Some(value.clone());
85 drop(last_value);
86 let sender = {
87 let sender = self.sender.lock().await;
88 sender.clone()
89 };
90 sender
91 .send(value.clone())
92 .await
93 .map_err(|e| EtherNetIpError::Subscription(format!("Failed to send update: {e}")))?;
94
95 Ok(())
96 }
97
98 fn value_changed(old: &PlcValue, new: &PlcValue, threshold: f32) -> bool {
102 match (old, new) {
103 (PlcValue::Real(o), PlcValue::Real(n)) => (*n - *o).abs() >= threshold,
104 (PlcValue::Lreal(o), PlcValue::Lreal(n)) => (*n - *o).abs() >= threshold as f64,
105 (PlcValue::Bool(o), PlcValue::Bool(n)) => o != n,
106 (PlcValue::Sint(o), PlcValue::Sint(n)) => o != n,
107 (PlcValue::Int(o), PlcValue::Int(n)) => o != n,
108 (PlcValue::Dint(o), PlcValue::Dint(n)) => o != n,
109 (PlcValue::Lint(o), PlcValue::Lint(n)) => o != n,
110 (PlcValue::Usint(o), PlcValue::Usint(n)) => o != n,
111 (PlcValue::Uint(o), PlcValue::Uint(n)) => o != n,
112 (PlcValue::Udint(o), PlcValue::Udint(n)) => o != n,
113 (PlcValue::Ulint(o), PlcValue::Ulint(n)) => o != n,
114 (PlcValue::String(o), PlcValue::String(n)) => o != n,
115 _ => true,
117 }
118 }
119
120 pub async fn wait_for_update(&self) -> Result<PlcValue> {
122 let mut receiver = self.receiver.lock().await;
123 let next_value = receiver.recv().await;
124 drop(receiver);
125 next_value.ok_or_else(|| EtherNetIpError::Subscription("Channel closed".to_string()))
126 }
127
128 pub async fn get_last_value(&self) -> Option<PlcValue> {
130 self.last_value.lock().await.clone()
131 }
132
133 async fn recv_next_value(&self) -> Option<PlcValue> {
134 let mut receiver = self.receiver.lock().await;
135 let next_value = receiver.recv().await;
136 drop(receiver);
137 next_value
138 }
139
140 pub fn into_stream(self: Arc<Self>) -> impl Stream<Item = PlcValue> + Send {
157 stream::unfold(self, |subscription| async move {
158 let next_value = subscription.recv_next_value().await;
159 next_value.map(|plc_value| (plc_value, subscription))
160 })
161 }
162}
163
164#[derive(Debug, Clone)]
166pub struct SubscriptionManager {
167 subscriptions: Arc<Mutex<Vec<TagSubscription>>>,
168}
169
170impl Default for SubscriptionManager {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176impl SubscriptionManager {
177 pub fn new() -> Self {
179 Self {
180 subscriptions: Arc::new(Mutex::new(Vec::new())),
181 }
182 }
183
184 pub async fn add_subscription(&self, subscription: TagSubscription) {
186 let mut subscriptions = self.subscriptions.lock().await;
187 subscriptions.push(subscription);
188 }
189
190 pub async fn remove_subscription(&self, tag_name: &str) {
192 let mut subscriptions = self.subscriptions.lock().await;
193 subscriptions.retain(|sub| sub.tag_path != tag_name);
194 }
195
196 pub async fn update_value(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
198 let subscriptions = {
199 let subscriptions = self.subscriptions.lock().await;
200 subscriptions.clone()
201 };
202 for subscription in &subscriptions {
203 if subscription.tag_path == tag_name && subscription.is_active() {
204 subscription.update_value(value).await?;
205 }
206 }
207 Ok(())
208 }
209
210 pub async fn get_subscriptions(&self) -> Vec<TagSubscription> {
212 let subscriptions = self.subscriptions.lock().await;
213 subscriptions.clone()
214 }
215
216 pub async fn get_subscription(&self, tag_name: &str) -> Option<TagSubscription> {
218 let subscriptions = self.subscriptions.lock().await;
219 subscriptions
220 .iter()
221 .find(|sub| sub.tag_path == tag_name)
222 .cloned()
223 }
224}