rust_ethernet_ip/
subscription.rs1use tokio::sync::{mpsc, Mutex};
2use std::sync::Arc;
3use std::sync::atomic::AtomicBool;
4use crate::error::{Result, EtherNetIpError};
5use crate::PlcValue;
6
7#[derive(Debug, Clone)]
9pub struct SubscriptionOptions {
10 pub update_rate: u32,
12 pub change_threshold: f32,
14 pub timeout: u32,
16}
17
18impl Default for SubscriptionOptions {
19 fn default() -> Self {
20 Self {
21 update_rate: 100, change_threshold: 0.001, timeout: 5000, }
25 }
26}
27
28#[derive(Debug, Clone)]
30pub struct TagSubscription {
31 pub tag_path: String,
33 pub options: SubscriptionOptions,
35 pub last_value: Arc<Mutex<Option<PlcValue>>>,
37 pub sender: Arc<Mutex<mpsc::Sender<PlcValue>>>,
39 pub receiver: Arc<Mutex<mpsc::Receiver<PlcValue>>>,
41 pub is_active: Arc<AtomicBool>,
43}
44
45impl TagSubscription {
46 pub fn new(tag_name: String, options: SubscriptionOptions) -> Self {
48 let (sender, receiver) = mpsc::channel(100); Self {
50 tag_path: tag_name,
51 options,
52 last_value: Arc::new(Mutex::new(None)),
53 sender: Arc::new(Mutex::new(sender)),
54 receiver: Arc::new(Mutex::new(receiver)),
55 is_active: Arc::new(AtomicBool::new(true)),
56 }
57 }
58
59 pub fn is_active(&self) -> bool {
61 self.is_active.load(std::sync::atomic::Ordering::Relaxed)
62 }
63
64 pub fn stop(&self) {
66 self.is_active.store(false, std::sync::atomic::Ordering::Relaxed);
67 }
68
69 pub async fn update_value(&self, value: &PlcValue) -> Result<()> {
71 let mut last_value = self.last_value.lock().await;
72
73 if let (Some(PlcValue::Real(old)), PlcValue::Real(new)) = (last_value.as_ref(), value) {
75 if (*new - *old).abs() < self.options.change_threshold {
76 return Ok(());
77 }
78 }
79
80 *last_value = Some(value.clone());
82 let sender = self.sender.lock().await;
83 sender.send(value.clone()).await
84 .map_err(|e| EtherNetIpError::Subscription(format!("Failed to send update: {}", e)))?;
85
86 Ok(())
87 }
88
89 pub async fn wait_for_update(&self) -> Result<PlcValue> {
91 let mut receiver = self.receiver.lock().await;
92 receiver.recv().await
93 .ok_or_else(|| EtherNetIpError::Subscription("Channel closed".to_string()))
94 }
95
96 pub async fn get_last_value(&self) -> Option<PlcValue> {
98 self.last_value.lock().await.clone()
99 }
100}
101
102#[derive(Debug, Clone)]
104pub struct SubscriptionManager {
105 subscriptions: Arc<Mutex<Vec<TagSubscription>>>,
106}
107
108impl Default for SubscriptionManager {
109 fn default() -> Self {
110 Self::new()
111 }
112}
113
114impl SubscriptionManager {
115 pub fn new() -> Self {
117 Self {
118 subscriptions: Arc::new(Mutex::new(Vec::new())),
119 }
120 }
121
122 pub async fn add_subscription(&self, subscription: TagSubscription) {
124 let mut subscriptions = self.subscriptions.lock().await;
125 subscriptions.push(subscription);
126 }
127
128 pub async fn remove_subscription(&self, tag_name: &str) {
130 let mut subscriptions = self.subscriptions.lock().await;
131 subscriptions.retain(|sub| sub.tag_path != tag_name);
132 }
133
134 pub async fn update_value(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
136 let subscriptions = self.subscriptions.lock().await;
137 for subscription in subscriptions.iter() {
138 if subscription.tag_path == tag_name && subscription.is_active() {
139 subscription.update_value(value).await?;
140 }
141 }
142 Ok(())
143 }
144
145 pub async fn get_subscriptions(&self) -> Vec<TagSubscription> {
147 let subscriptions = self.subscriptions.lock().await;
148 subscriptions.clone()
149 }
150
151 pub async fn get_subscription(&self, tag_name: &str) -> Option<TagSubscription> {
153 let subscriptions = self.subscriptions.lock().await;
154 subscriptions.iter()
155 .find(|sub| sub.tag_path == tag_name)
156 .cloned()
157 }
158}