rust_ethernet_ip/
subscription.rs1use crate::error::{EtherNetIpError, Result};
2use crate::PlcValue;
3use std::sync::atomic::AtomicBool;
4use std::sync::Arc;
5use tokio::sync::{mpsc, Mutex};
6
7#[derive(Debug, Clone)]
9pub struct SubscriptionOptions {
10    pub update_rate: u32,
12    pub change_threshold: f32,
14    pub timeout: u32,
16}
17
18impl Default for SubscriptionOptions {
19    fn default() -> Self {
20        Self {
21            update_rate: 100,        change_threshold: 0.001, timeout: 5000,           }
25    }
26}
27
28#[derive(Debug, Clone)]
30pub struct TagSubscription {
31    pub tag_path: String,
33    pub options: SubscriptionOptions,
35    pub last_value: Arc<Mutex<Option<PlcValue>>>,
37    pub sender: Arc<Mutex<mpsc::Sender<PlcValue>>>,
39    pub receiver: Arc<Mutex<mpsc::Receiver<PlcValue>>>,
41    pub is_active: Arc<AtomicBool>,
43}
44
45impl TagSubscription {
46    pub fn new(tag_name: String, options: SubscriptionOptions) -> Self {
48        let (sender, receiver) = mpsc::channel(100); Self {
50            tag_path: tag_name,
51            options,
52            last_value: Arc::new(Mutex::new(None)),
53            sender: Arc::new(Mutex::new(sender)),
54            receiver: Arc::new(Mutex::new(receiver)),
55            is_active: Arc::new(AtomicBool::new(true)),
56        }
57    }
58
59    pub fn is_active(&self) -> bool {
61        self.is_active.load(std::sync::atomic::Ordering::Relaxed)
62    }
63
64    pub fn stop(&self) {
66        self.is_active
67            .store(false, std::sync::atomic::Ordering::Relaxed);
68    }
69
70    pub async fn update_value(&self, value: &PlcValue) -> Result<()> {
72        let mut last_value = self.last_value.lock().await;
73
74        if let (Some(PlcValue::Real(old)), PlcValue::Real(new)) = (last_value.as_ref(), value) {
76            if (*new - *old).abs() < self.options.change_threshold {
77                return Ok(());
78            }
79        }
80
81        *last_value = Some(value.clone());
83        let sender = self.sender.lock().await;
84        sender
85            .send(value.clone())
86            .await
87            .map_err(|e| EtherNetIpError::Subscription(format!("Failed to send update: {}", e)))?;
88
89        Ok(())
90    }
91
92    pub async fn wait_for_update(&self) -> Result<PlcValue> {
94        let mut receiver = self.receiver.lock().await;
95        receiver
96            .recv()
97            .await
98            .ok_or_else(|| EtherNetIpError::Subscription("Channel closed".to_string()))
99    }
100
101    pub async fn get_last_value(&self) -> Option<PlcValue> {
103        self.last_value.lock().await.clone()
104    }
105}
106
107#[derive(Debug, Clone)]
109pub struct SubscriptionManager {
110    subscriptions: Arc<Mutex<Vec<TagSubscription>>>,
111}
112
113impl Default for SubscriptionManager {
114    fn default() -> Self {
115        Self::new()
116    }
117}
118
119impl SubscriptionManager {
120    pub fn new() -> Self {
122        Self {
123            subscriptions: Arc::new(Mutex::new(Vec::new())),
124        }
125    }
126
127    pub async fn add_subscription(&self, subscription: TagSubscription) {
129        let mut subscriptions = self.subscriptions.lock().await;
130        subscriptions.push(subscription);
131    }
132
133    pub async fn remove_subscription(&self, tag_name: &str) {
135        let mut subscriptions = self.subscriptions.lock().await;
136        subscriptions.retain(|sub| sub.tag_path != tag_name);
137    }
138
139    pub async fn update_value(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
141        let subscriptions = self.subscriptions.lock().await;
142        for subscription in subscriptions.iter() {
143            if subscription.tag_path == tag_name && subscription.is_active() {
144                subscription.update_value(value).await?;
145            }
146        }
147        Ok(())
148    }
149
150    pub async fn get_subscriptions(&self) -> Vec<TagSubscription> {
152        let subscriptions = self.subscriptions.lock().await;
153        subscriptions.clone()
154    }
155
156    pub async fn get_subscription(&self, tag_name: &str) -> Option<TagSubscription> {
158        let subscriptions = self.subscriptions.lock().await;
159        subscriptions
160            .iter()
161            .find(|sub| sub.tag_path == tag_name)
162            .cloned()
163    }
164}