Skip to main content

rust_ethernet_ip/
subscription.rs

1use crate::error::{EtherNetIpError, Result};
2use crate::PlcValue;
3use std::sync::atomic::AtomicBool;
4use std::sync::Arc;
5use tokio::sync::{mpsc, Mutex};
6
7use async_stream::stream;
8use futures::Stream;
9
10/// Configuration options for tag subscriptions
11#[derive(Debug, Clone)]
12pub struct SubscriptionOptions {
13    /// Update rate in milliseconds
14    pub update_rate: u32,
15    /// Change threshold for numeric values
16    pub change_threshold: f32,
17    /// Timeout in milliseconds
18    pub timeout: u32,
19}
20
21impl Default for SubscriptionOptions {
22    fn default() -> Self {
23        Self {
24            update_rate: 100,        // 100ms default update rate
25            change_threshold: 0.001, // 0.1% change threshold
26            timeout: 5000,           // 5 second timeout
27        }
28    }
29}
30
31/// Represents a subscription to a PLC tag
32#[derive(Debug, Clone)]
33pub struct TagSubscription {
34    /// The path of the subscribed tag
35    pub tag_path: String,
36    /// Subscription configuration
37    pub options: SubscriptionOptions,
38    /// Last received value
39    pub last_value: Arc<Mutex<Option<PlcValue>>>,
40    /// Channel sender for value updates
41    pub sender: Arc<Mutex<mpsc::Sender<PlcValue>>>,
42    /// Channel receiver for value updates
43    pub receiver: Arc<Mutex<mpsc::Receiver<PlcValue>>>,
44    /// Whether the subscription is active
45    pub is_active: Arc<AtomicBool>,
46}
47
48impl TagSubscription {
49    /// Creates a new tag subscription
50    pub fn new(tag_name: String, options: SubscriptionOptions) -> Self {
51        let (sender, receiver) = mpsc::channel(100); // Buffer size of 100
52        Self {
53            tag_path: tag_name,
54            options,
55            last_value: Arc::new(Mutex::new(None)),
56            sender: Arc::new(Mutex::new(sender)),
57            receiver: Arc::new(Mutex::new(receiver)),
58            is_active: Arc::new(AtomicBool::new(true)),
59        }
60    }
61
62    /// Checks if the subscription is active
63    pub fn is_active(&self) -> bool {
64        self.is_active.load(std::sync::atomic::Ordering::Relaxed)
65    }
66
67    /// Stops the subscription
68    pub fn stop(&self) {
69        self.is_active
70            .store(false, std::sync::atomic::Ordering::Relaxed);
71    }
72
73    /// Updates the subscription value
74    pub async fn update_value(&self, value: &PlcValue) -> Result<()> {
75        let mut last_value = self.last_value.lock().await;
76
77        // Check if value has changed enough to notify
78        if let Some(old) = last_value.as_ref() {
79            if !Self::value_changed(old, value, self.options.change_threshold) {
80                return Ok(());
81            }
82        }
83
84        // Update value and send notification
85        *last_value = Some(value.clone());
86        let sender = self.sender.lock().await;
87        sender
88            .send(value.clone())
89            .await
90            .map_err(|e| EtherNetIpError::Subscription(format!("Failed to send update: {e}")))?;
91
92        Ok(())
93    }
94
95    /// Checks whether a value has changed enough to warrant a notification.
96    /// For floating-point types, uses the change_threshold as a deadband.
97    /// For all other types, triggers on any change.
98    fn value_changed(old: &PlcValue, new: &PlcValue, threshold: f32) -> bool {
99        match (old, new) {
100            (PlcValue::Real(o), PlcValue::Real(n)) => (*n - *o).abs() >= threshold,
101            (PlcValue::Lreal(o), PlcValue::Lreal(n)) => (*n - *o).abs() >= threshold as f64,
102            (PlcValue::Bool(o), PlcValue::Bool(n)) => o != n,
103            (PlcValue::Sint(o), PlcValue::Sint(n)) => o != n,
104            (PlcValue::Int(o), PlcValue::Int(n)) => o != n,
105            (PlcValue::Dint(o), PlcValue::Dint(n)) => o != n,
106            (PlcValue::Lint(o), PlcValue::Lint(n)) => o != n,
107            (PlcValue::Usint(o), PlcValue::Usint(n)) => o != n,
108            (PlcValue::Uint(o), PlcValue::Uint(n)) => o != n,
109            (PlcValue::Udint(o), PlcValue::Udint(n)) => o != n,
110            (PlcValue::Ulint(o), PlcValue::Ulint(n)) => o != n,
111            (PlcValue::String(o), PlcValue::String(n)) => o != n,
112            // Different types or UDTs — always notify
113            _ => true,
114        }
115    }
116
117    /// Waits for the next value update
118    pub async fn wait_for_update(&self) -> Result<PlcValue> {
119        let mut receiver = self.receiver.lock().await;
120        receiver
121            .recv()
122            .await
123            .ok_or_else(|| EtherNetIpError::Subscription("Channel closed".to_string()))
124    }
125
126    /// Gets the last value received
127    pub async fn get_last_value(&self) -> Option<PlcValue> {
128        self.last_value.lock().await.clone()
129    }
130
131    /// Returns an async stream of value updates for this subscription.
132    ///
133    /// The stream yields each value as it is received from the background poll loop.
134    /// Use with `StreamExt` (e.g. `.next().await`) or `select!` for composition.
135    ///
136    /// # Example
137    ///
138    /// ```ignore
139    /// use futures_util::StreamExt;
140    ///
141    /// let subscription = client.subscribe_to_tag("MyTag", SubscriptionOptions::default()).await?;
142    /// let mut stream = subscription.into_stream();
143    /// while let Some(value) = stream.next().await {
144    ///     println!("Update: {:?}", value);
145    /// }
146    /// ```
147    pub fn into_stream(self: Arc<Self>) -> impl Stream<Item = PlcValue> + Send {
148        stream! {
149            loop {
150                let v = {
151                    let mut receiver = self.receiver.lock().await;
152                    receiver.recv().await
153                };
154                match v {
155                    Some(plc_value) => yield plc_value,
156                    None => break,
157                }
158            }
159        }
160    }
161}
162
163/// Manages multiple tag subscriptions
164#[derive(Debug, Clone)]
165pub struct SubscriptionManager {
166    subscriptions: Arc<Mutex<Vec<TagSubscription>>>,
167}
168
169impl Default for SubscriptionManager {
170    fn default() -> Self {
171        Self::new()
172    }
173}
174
175impl SubscriptionManager {
176    /// Creates a new subscription manager
177    pub fn new() -> Self {
178        Self {
179            subscriptions: Arc::new(Mutex::new(Vec::new())),
180        }
181    }
182
183    /// Adds a new subscription
184    pub async fn add_subscription(&self, subscription: TagSubscription) {
185        let mut subscriptions = self.subscriptions.lock().await;
186        subscriptions.push(subscription);
187    }
188
189    /// Removes a subscription
190    pub async fn remove_subscription(&self, tag_name: &str) {
191        let mut subscriptions = self.subscriptions.lock().await;
192        subscriptions.retain(|sub| sub.tag_path != tag_name);
193    }
194
195    /// Updates a value for all matching subscriptions
196    pub async fn update_value(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
197        let subscriptions = self.subscriptions.lock().await;
198        for subscription in subscriptions.iter() {
199            if subscription.tag_path == tag_name && subscription.is_active() {
200                subscription.update_value(value).await?;
201            }
202        }
203        Ok(())
204    }
205
206    /// Gets all active subscriptions
207    pub async fn get_subscriptions(&self) -> Vec<TagSubscription> {
208        let subscriptions = self.subscriptions.lock().await;
209        subscriptions.clone()
210    }
211
212    /// Gets a specific subscription by tag name
213    pub async fn get_subscription(&self, tag_name: &str) -> Option<TagSubscription> {
214        let subscriptions = self.subscriptions.lock().await;
215        subscriptions
216            .iter()
217            .find(|sub| sub.tag_path == tag_name)
218            .cloned()
219    }
220}