rust_ethernet_ip/
subscription.rs

1use tokio::sync::{mpsc, Mutex};
2use std::sync::Arc;
3use std::sync::atomic::AtomicBool;
4use crate::error::{Result, EtherNetIpError};
5use crate::PlcValue;
6
7/// Configuration options for tag subscriptions
8#[derive(Debug, Clone)]
9pub struct SubscriptionOptions {
10    /// Update rate in milliseconds
11    pub update_rate: u32,
12    /// Change threshold for numeric values
13    pub change_threshold: f32,
14    /// Timeout in milliseconds
15    pub timeout: u32,
16}
17
18impl Default for SubscriptionOptions {
19    fn default() -> Self {
20        Self {
21            update_rate: 100, // 100ms default update rate
22            change_threshold: 0.001, // 0.1% change threshold
23            timeout: 5000, // 5 second timeout
24        }
25    }
26}
27
28/// Represents a subscription to a PLC tag
29#[derive(Debug, Clone)]
30pub struct TagSubscription {
31    /// The path of the subscribed tag
32    pub tag_path: String,
33    /// Subscription configuration
34    pub options: SubscriptionOptions,
35    /// Last received value
36    pub last_value: Arc<Mutex<Option<PlcValue>>>,
37    /// Channel sender for value updates
38    pub sender: Arc<Mutex<mpsc::Sender<PlcValue>>>,
39    /// Channel receiver for value updates
40    pub receiver: Arc<Mutex<mpsc::Receiver<PlcValue>>>,
41    /// Whether the subscription is active
42    pub is_active: Arc<AtomicBool>,
43}
44
45impl TagSubscription {
46    /// Creates a new tag subscription
47    pub fn new(tag_name: String, options: SubscriptionOptions) -> Self {
48        let (sender, receiver) = mpsc::channel(100); // Buffer size of 100
49        Self {
50            tag_path: tag_name,
51            options,
52            last_value: Arc::new(Mutex::new(None)),
53            sender: Arc::new(Mutex::new(sender)),
54            receiver: Arc::new(Mutex::new(receiver)),
55            is_active: Arc::new(AtomicBool::new(true)),
56        }
57    }
58
59    /// Checks if the subscription is active
60    pub fn is_active(&self) -> bool {
61        self.is_active.load(std::sync::atomic::Ordering::Relaxed)
62    }
63
64    /// Stops the subscription
65    pub fn stop(&self) {
66        self.is_active.store(false, std::sync::atomic::Ordering::Relaxed);
67    }
68
69    /// Updates the subscription value
70    pub async fn update_value(&self, value: &PlcValue) -> Result<()> {
71        let mut last_value = self.last_value.lock().await;
72        
73        // Check if value has changed significantly
74        if let (Some(PlcValue::Real(old)), PlcValue::Real(new)) = (last_value.as_ref(), value) {
75            if (*new - *old).abs() < self.options.change_threshold {
76                return Ok(());
77            }
78        }
79
80        // Update value and send notification
81        *last_value = Some(value.clone());
82        let sender = self.sender.lock().await;
83        sender.send(value.clone()).await
84            .map_err(|e| EtherNetIpError::Subscription(format!("Failed to send update: {}", e)))?;
85        
86        Ok(())
87    }
88
89    /// Waits for the next value update
90    pub async fn wait_for_update(&self) -> Result<PlcValue> {
91        let mut receiver = self.receiver.lock().await;
92        receiver.recv().await
93            .ok_or_else(|| EtherNetIpError::Subscription("Channel closed".to_string()))
94    }
95
96    /// Gets the last value received
97    pub async fn get_last_value(&self) -> Option<PlcValue> {
98        self.last_value.lock().await.clone()
99    }
100}
101
102/// Manages multiple tag subscriptions
103#[derive(Debug, Clone)]
104pub struct SubscriptionManager {
105    subscriptions: Arc<Mutex<Vec<TagSubscription>>>,
106}
107
108impl Default for SubscriptionManager {
109    fn default() -> Self {
110        Self::new()
111    }
112}
113
114impl SubscriptionManager {
115    /// Creates a new subscription manager
116    pub fn new() -> Self {
117        Self {
118            subscriptions: Arc::new(Mutex::new(Vec::new())),
119        }
120    }
121
122    /// Adds a new subscription
123    pub async fn add_subscription(&self, subscription: TagSubscription) {
124        let mut subscriptions = self.subscriptions.lock().await;
125        subscriptions.push(subscription);
126    }
127
128    /// Removes a subscription
129    pub async fn remove_subscription(&self, tag_name: &str) {
130        let mut subscriptions = self.subscriptions.lock().await;
131        subscriptions.retain(|sub| sub.tag_path != tag_name);
132    }
133
134    /// Updates a value for all matching subscriptions
135    pub async fn update_value(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
136        let subscriptions = self.subscriptions.lock().await;
137        for subscription in subscriptions.iter() {
138            if subscription.tag_path == tag_name && subscription.is_active() {
139                subscription.update_value(value).await?;
140            }
141        }
142        Ok(())
143    }
144
145    /// Gets all active subscriptions
146    pub async fn get_subscriptions(&self) -> Vec<TagSubscription> {
147        let subscriptions = self.subscriptions.lock().await;
148        subscriptions.clone()
149    }
150
151    /// Gets a specific subscription by tag name
152    pub async fn get_subscription(&self, tag_name: &str) -> Option<TagSubscription> {
153        let subscriptions = self.subscriptions.lock().await;
154        subscriptions.iter()
155            .find(|sub| sub.tag_path == tag_name)
156            .cloned()
157    }
158}