Skip to main content

rust_ethernet_ip/
subscription.rs

1use crate::PlcValue;
2use crate::error::{EtherNetIpError, Result};
3use std::sync::Arc;
4use std::sync::atomic::AtomicBool;
5use tokio::sync::{Mutex, mpsc};
6
7use futures::{Stream, stream};
8
9/// Configuration options for tag subscriptions
10#[derive(Debug, Clone)]
11pub struct SubscriptionOptions {
12    /// Update rate in milliseconds
13    pub update_rate: u32,
14    /// Change threshold for numeric values
15    pub change_threshold: f32,
16    /// Timeout in milliseconds
17    pub timeout: u32,
18}
19
20impl Default for SubscriptionOptions {
21    fn default() -> Self {
22        Self {
23            update_rate: 100,        // 100ms default update rate
24            change_threshold: 0.001, // 0.1% change threshold
25            timeout: 5000,           // 5 second timeout
26        }
27    }
28}
29
30/// Represents a subscription to a PLC tag
31#[derive(Debug, Clone)]
32pub struct TagSubscription {
33    /// The path of the subscribed tag
34    pub tag_path: String,
35    /// Subscription configuration
36    pub options: SubscriptionOptions,
37    /// Last received value
38    pub last_value: Arc<Mutex<Option<PlcValue>>>,
39    /// Channel sender for value updates
40    pub sender: Arc<Mutex<mpsc::Sender<PlcValue>>>,
41    /// Channel receiver for value updates
42    pub receiver: Arc<Mutex<mpsc::Receiver<PlcValue>>>,
43    /// Whether the subscription is active
44    pub is_active: Arc<AtomicBool>,
45}
46
47impl TagSubscription {
48    /// Creates a new tag subscription
49    pub fn new(tag_name: String, options: SubscriptionOptions) -> Self {
50        let (sender, receiver) = mpsc::channel(100); // Buffer size of 100
51        Self {
52            tag_path: tag_name,
53            options,
54            last_value: Arc::new(Mutex::new(None)),
55            sender: Arc::new(Mutex::new(sender)),
56            receiver: Arc::new(Mutex::new(receiver)),
57            is_active: Arc::new(AtomicBool::new(true)),
58        }
59    }
60
61    /// Checks if the subscription is active
62    pub fn is_active(&self) -> bool {
63        self.is_active.load(std::sync::atomic::Ordering::Relaxed)
64    }
65
66    /// Stops the subscription
67    pub fn stop(&self) {
68        self.is_active
69            .store(false, std::sync::atomic::Ordering::Relaxed);
70    }
71
72    /// Updates the subscription value
73    pub async fn update_value(&self, value: &PlcValue) -> Result<()> {
74        let mut last_value = self.last_value.lock().await;
75
76        // Check if value has changed enough to notify
77        if let Some(old) = last_value.as_ref()
78            && !Self::value_changed(old, value, self.options.change_threshold)
79        {
80            return Ok(());
81        }
82
83        // Update value and send notification
84        *last_value = Some(value.clone());
85        drop(last_value);
86        let sender = {
87            let sender = self.sender.lock().await;
88            sender.clone()
89        };
90        sender
91            .send(value.clone())
92            .await
93            .map_err(|e| EtherNetIpError::Subscription(format!("Failed to send update: {e}")))?;
94
95        Ok(())
96    }
97
98    /// Checks whether a value has changed enough to warrant a notification.
99    /// For floating-point types, uses the change_threshold as a deadband.
100    /// For all other types, triggers on any change.
101    fn value_changed(old: &PlcValue, new: &PlcValue, threshold: f32) -> bool {
102        match (old, new) {
103            (PlcValue::Real(o), PlcValue::Real(n)) => (*n - *o).abs() >= threshold,
104            (PlcValue::Lreal(o), PlcValue::Lreal(n)) => (*n - *o).abs() >= threshold as f64,
105            (PlcValue::Bool(o), PlcValue::Bool(n)) => o != n,
106            (PlcValue::Sint(o), PlcValue::Sint(n)) => o != n,
107            (PlcValue::Int(o), PlcValue::Int(n)) => o != n,
108            (PlcValue::Dint(o), PlcValue::Dint(n)) => o != n,
109            (PlcValue::Lint(o), PlcValue::Lint(n)) => o != n,
110            (PlcValue::Usint(o), PlcValue::Usint(n)) => o != n,
111            (PlcValue::Uint(o), PlcValue::Uint(n)) => o != n,
112            (PlcValue::Udint(o), PlcValue::Udint(n)) => o != n,
113            (PlcValue::Ulint(o), PlcValue::Ulint(n)) => o != n,
114            (PlcValue::String(o), PlcValue::String(n)) => o != n,
115            // Different types or UDTs — always notify
116            _ => true,
117        }
118    }
119
120    /// Waits for the next value update
121    pub async fn wait_for_update(&self) -> Result<PlcValue> {
122        let mut receiver = self.receiver.lock().await;
123        let next_value = receiver.recv().await;
124        drop(receiver);
125        next_value.ok_or_else(|| EtherNetIpError::Subscription("Channel closed".to_string()))
126    }
127
128    /// Gets the last value received
129    pub async fn get_last_value(&self) -> Option<PlcValue> {
130        self.last_value.lock().await.clone()
131    }
132
133    async fn recv_next_value(&self) -> Option<PlcValue> {
134        let mut receiver = self.receiver.lock().await;
135        let next_value = receiver.recv().await;
136        drop(receiver);
137        next_value
138    }
139
140    /// Returns an async stream of value updates for this subscription.
141    ///
142    /// The stream yields each value as it is received from the background poll loop.
143    /// Use with `StreamExt` (e.g. `.next().await`) or `select!` for composition.
144    ///
145    /// # Example
146    ///
147    /// ```ignore
148    /// use futures_util::StreamExt;
149    ///
150    /// let subscription = client.subscribe_to_tag("MyTag", SubscriptionOptions::default()).await?;
151    /// let mut stream = subscription.into_stream();
152    /// while let Some(value) = stream.next().await {
153    ///     println!("Update: {:?}", value);
154    /// }
155    /// ```
156    pub fn into_stream(self: Arc<Self>) -> impl Stream<Item = PlcValue> + Send {
157        stream::unfold(self, |subscription| async move {
158            let next_value = subscription.recv_next_value().await;
159            next_value.map(|plc_value| (plc_value, subscription))
160        })
161    }
162}
163
164/// Manages multiple tag subscriptions
165#[derive(Debug, Clone)]
166pub struct SubscriptionManager {
167    subscriptions: Arc<Mutex<Vec<TagSubscription>>>,
168}
169
170impl Default for SubscriptionManager {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176impl SubscriptionManager {
177    /// Creates a new subscription manager
178    pub fn new() -> Self {
179        Self {
180            subscriptions: Arc::new(Mutex::new(Vec::new())),
181        }
182    }
183
184    /// Adds a new subscription
185    pub async fn add_subscription(&self, subscription: TagSubscription) {
186        let mut subscriptions = self.subscriptions.lock().await;
187        subscriptions.push(subscription);
188    }
189
190    /// Removes a subscription
191    pub async fn remove_subscription(&self, tag_name: &str) {
192        let mut subscriptions = self.subscriptions.lock().await;
193        subscriptions.retain(|sub| sub.tag_path != tag_name);
194    }
195
196    /// Updates a value for all matching subscriptions
197    pub async fn update_value(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
198        let subscriptions = {
199            let subscriptions = self.subscriptions.lock().await;
200            subscriptions.clone()
201        };
202        for subscription in &subscriptions {
203            if subscription.tag_path == tag_name && subscription.is_active() {
204                subscription.update_value(value).await?;
205            }
206        }
207        Ok(())
208    }
209
210    /// Gets all active subscriptions
211    pub async fn get_subscriptions(&self) -> Vec<TagSubscription> {
212        let subscriptions = self.subscriptions.lock().await;
213        subscriptions.clone()
214    }
215
216    /// Gets a specific subscription by tag name
217    pub async fn get_subscription(&self, tag_name: &str) -> Option<TagSubscription> {
218        let subscriptions = self.subscriptions.lock().await;
219        subscriptions
220            .iter()
221            .find(|sub| sub.tag_path == tag_name)
222            .cloned()
223    }
224}