Skip to main content

rust_ethernet_ip/
tag_subscription.rs

1use crate::error::{EtherNetIpError, Result};
2use crate::PlcValue;
3use std::sync::atomic::AtomicBool;
4use std::sync::Arc;
5use tokio::sync::{mpsc, Mutex};
6
7/// Configuration options for tag subscriptions
8#[derive(Debug, Clone)]
9pub struct SubscriptionOptions {
10    /// Update rate in milliseconds
11    pub update_rate: u32,
12    /// Change threshold for numeric values
13    pub change_threshold: f32,
14    /// Timeout in milliseconds
15    pub timeout: u32,
16}
17
18impl Default for SubscriptionOptions {
19    fn default() -> Self {
20        Self {
21            update_rate: 100,        // 100ms default update rate
22            change_threshold: 0.001, // 0.1% change threshold
23            timeout: 5000,           // 5 second timeout
24        }
25    }
26}
27
28/// Represents a subscription to a PLC tag
29#[derive(Debug, Clone)]
30pub struct TagSubscription {
31    /// The path of the subscribed tag
32    pub tag_path: String,
33    /// Subscription configuration
34    pub options: SubscriptionOptions,
35    /// Last received value
36    pub last_value: Arc<Mutex<Option<PlcValue>>>,
37    /// Channel sender for value updates
38    pub sender: Arc<Mutex<mpsc::Sender<PlcValue>>>,
39    /// Channel receiver for value updates
40    pub receiver: Arc<Mutex<mpsc::Receiver<PlcValue>>>,
41    /// Whether the subscription is active
42    pub is_active: Arc<AtomicBool>,
43}
44
45impl TagSubscription {
46    /// Creates a new tag subscription
47    pub fn new(tag_name: String, options: SubscriptionOptions) -> Self {
48        let (sender, receiver) = mpsc::channel(100); // Buffer size of 100
49        Self {
50            tag_path: tag_name,
51            options,
52            last_value: Arc::new(Mutex::new(None)),
53            sender: Arc::new(Mutex::new(sender)),
54            receiver: Arc::new(Mutex::new(receiver)),
55            is_active: Arc::new(AtomicBool::new(true)),
56        }
57    }
58
59    /// Checks if the subscription is active
60    pub fn is_active(&self) -> bool {
61        self.is_active.load(std::sync::atomic::Ordering::Relaxed)
62    }
63
64    /// Stops the subscription
65    pub fn stop(&self) {
66        self.is_active
67            .store(false, std::sync::atomic::Ordering::Relaxed);
68    }
69
70    /// Updates the subscription value
71    pub async fn update_value(&self, value: &PlcValue) -> Result<()> {
72        let mut last_value = self.last_value.lock().await;
73
74        // Check if value has changed enough to notify
75        if let Some(old) = last_value.as_ref() {
76            if !Self::value_changed(old, value, self.options.change_threshold) {
77                return Ok(());
78            }
79        }
80
81        // Update value and send notification
82        *last_value = Some(value.clone());
83        let sender = self.sender.lock().await;
84        sender
85            .send(value.clone())
86            .await
87            .map_err(|e| EtherNetIpError::Subscription(format!("Failed to send update: {e}")))?;
88
89        Ok(())
90    }
91
92    /// Checks whether a value has changed enough to warrant a notification.
93    /// For floating-point types, uses the change_threshold as a deadband.
94    /// For all other types, triggers on any change.
95    fn value_changed(old: &PlcValue, new: &PlcValue, threshold: f32) -> bool {
96        match (old, new) {
97            (PlcValue::Real(o), PlcValue::Real(n)) => (*n - *o).abs() >= threshold,
98            (PlcValue::Lreal(o), PlcValue::Lreal(n)) => (*n - *o).abs() >= threshold as f64,
99            (PlcValue::Bool(o), PlcValue::Bool(n)) => o != n,
100            (PlcValue::Sint(o), PlcValue::Sint(n)) => o != n,
101            (PlcValue::Int(o), PlcValue::Int(n)) => o != n,
102            (PlcValue::Dint(o), PlcValue::Dint(n)) => o != n,
103            (PlcValue::Lint(o), PlcValue::Lint(n)) => o != n,
104            (PlcValue::Usint(o), PlcValue::Usint(n)) => o != n,
105            (PlcValue::Uint(o), PlcValue::Uint(n)) => o != n,
106            (PlcValue::Udint(o), PlcValue::Udint(n)) => o != n,
107            (PlcValue::Ulint(o), PlcValue::Ulint(n)) => o != n,
108            (PlcValue::String(o), PlcValue::String(n)) => o != n,
109            // Different types or UDTs — always notify
110            _ => true,
111        }
112    }
113
114    /// Waits for the next value update
115    pub async fn wait_for_update(&self) -> Result<PlcValue> {
116        let mut receiver = self.receiver.lock().await;
117        receiver
118            .recv()
119            .await
120            .ok_or_else(|| EtherNetIpError::Subscription("Channel closed".to_string()))
121    }
122
123    /// Gets the last value received
124    pub async fn get_last_value(&self) -> Option<PlcValue> {
125        self.last_value.lock().await.clone()
126    }
127}
128
129/// Manages multiple tag subscriptions
130#[derive(Debug, Clone)]
131pub struct SubscriptionManager {
132    subscriptions: Arc<Mutex<Vec<TagSubscription>>>,
133}
134
135impl Default for SubscriptionManager {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141impl SubscriptionManager {
142    /// Creates a new subscription manager
143    pub fn new() -> Self {
144        Self {
145            subscriptions: Arc::new(Mutex::new(Vec::new())),
146        }
147    }
148
149    /// Adds a new subscription
150    pub async fn add_subscription(&self, subscription: TagSubscription) {
151        let mut subscriptions = self.subscriptions.lock().await;
152        subscriptions.push(subscription);
153    }
154
155    /// Removes a subscription
156    pub async fn remove_subscription(&self, tag_name: &str) {
157        let mut subscriptions = self.subscriptions.lock().await;
158        subscriptions.retain(|sub| sub.tag_path != tag_name);
159    }
160
161    /// Updates a value for all matching subscriptions
162    pub async fn update_value(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
163        let subscriptions = self.subscriptions.lock().await;
164        for subscription in subscriptions.iter() {
165            if subscription.tag_path == tag_name && subscription.is_active() {
166                subscription.update_value(value).await?;
167            }
168        }
169        Ok(())
170    }
171
172    /// Gets all active subscriptions
173    pub async fn get_subscriptions(&self) -> Vec<TagSubscription> {
174        let subscriptions = self.subscriptions.lock().await;
175        subscriptions.clone()
176    }
177
178    /// Gets a specific subscription by tag name
179    pub async fn get_subscription(&self, tag_name: &str) -> Option<TagSubscription> {
180        let subscriptions = self.subscriptions.lock().await;
181        subscriptions
182            .iter()
183            .find(|sub| sub.tag_path == tag_name)
184            .cloned()
185    }
186}