use crate::error::{EtherNetIpError, Result};
use crate::PlcValue;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use async_stream::stream;
use futures::Stream;
#[derive(Debug, Clone)]
pub struct SubscriptionOptions {
pub update_rate: u32,
pub change_threshold: f32,
pub timeout: u32,
}
impl Default for SubscriptionOptions {
fn default() -> Self {
Self {
update_rate: 100, change_threshold: 0.001, timeout: 5000, }
}
}
#[derive(Debug, Clone)]
pub struct TagSubscription {
pub tag_path: String,
pub options: SubscriptionOptions,
pub last_value: Arc<Mutex<Option<PlcValue>>>,
pub sender: Arc<Mutex<mpsc::Sender<PlcValue>>>,
pub receiver: Arc<Mutex<mpsc::Receiver<PlcValue>>>,
pub is_active: Arc<AtomicBool>,
}
impl TagSubscription {
pub fn new(tag_name: String, options: SubscriptionOptions) -> Self {
let (sender, receiver) = mpsc::channel(100); Self {
tag_path: tag_name,
options,
last_value: Arc::new(Mutex::new(None)),
sender: Arc::new(Mutex::new(sender)),
receiver: Arc::new(Mutex::new(receiver)),
is_active: Arc::new(AtomicBool::new(true)),
}
}
pub fn is_active(&self) -> bool {
self.is_active.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn stop(&self) {
self.is_active
.store(false, std::sync::atomic::Ordering::Relaxed);
}
pub async fn update_value(&self, value: &PlcValue) -> Result<()> {
let mut last_value = self.last_value.lock().await;
if let Some(old) = last_value.as_ref() {
if !Self::value_changed(old, value, self.options.change_threshold) {
return Ok(());
}
}
*last_value = Some(value.clone());
let sender = self.sender.lock().await;
sender
.send(value.clone())
.await
.map_err(|e| EtherNetIpError::Subscription(format!("Failed to send update: {e}")))?;
Ok(())
}
fn value_changed(old: &PlcValue, new: &PlcValue, threshold: f32) -> bool {
match (old, new) {
(PlcValue::Real(o), PlcValue::Real(n)) => (*n - *o).abs() >= threshold,
(PlcValue::Lreal(o), PlcValue::Lreal(n)) => (*n - *o).abs() >= threshold as f64,
(PlcValue::Bool(o), PlcValue::Bool(n)) => o != n,
(PlcValue::Sint(o), PlcValue::Sint(n)) => o != n,
(PlcValue::Int(o), PlcValue::Int(n)) => o != n,
(PlcValue::Dint(o), PlcValue::Dint(n)) => o != n,
(PlcValue::Lint(o), PlcValue::Lint(n)) => o != n,
(PlcValue::Usint(o), PlcValue::Usint(n)) => o != n,
(PlcValue::Uint(o), PlcValue::Uint(n)) => o != n,
(PlcValue::Udint(o), PlcValue::Udint(n)) => o != n,
(PlcValue::Ulint(o), PlcValue::Ulint(n)) => o != n,
(PlcValue::String(o), PlcValue::String(n)) => o != n,
_ => true,
}
}
pub async fn wait_for_update(&self) -> Result<PlcValue> {
let mut receiver = self.receiver.lock().await;
receiver
.recv()
.await
.ok_or_else(|| EtherNetIpError::Subscription("Channel closed".to_string()))
}
pub async fn get_last_value(&self) -> Option<PlcValue> {
self.last_value.lock().await.clone()
}
pub fn into_stream(self: Arc<Self>) -> impl Stream<Item = PlcValue> + Send {
stream! {
loop {
let v = {
let mut receiver = self.receiver.lock().await;
receiver.recv().await
};
match v {
Some(plc_value) => yield plc_value,
None => break,
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct SubscriptionManager {
subscriptions: Arc<Mutex<Vec<TagSubscription>>>,
}
impl Default for SubscriptionManager {
fn default() -> Self {
Self::new()
}
}
impl SubscriptionManager {
pub fn new() -> Self {
Self {
subscriptions: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn add_subscription(&self, subscription: TagSubscription) {
let mut subscriptions = self.subscriptions.lock().await;
subscriptions.push(subscription);
}
pub async fn remove_subscription(&self, tag_name: &str) {
let mut subscriptions = self.subscriptions.lock().await;
subscriptions.retain(|sub| sub.tag_path != tag_name);
}
pub async fn update_value(&self, tag_name: &str, value: &PlcValue) -> Result<()> {
let subscriptions = self.subscriptions.lock().await;
for subscription in subscriptions.iter() {
if subscription.tag_path == tag_name && subscription.is_active() {
subscription.update_value(value).await?;
}
}
Ok(())
}
pub async fn get_subscriptions(&self) -> Vec<TagSubscription> {
let subscriptions = self.subscriptions.lock().await;
subscriptions.clone()
}
pub async fn get_subscription(&self, tag_name: &str) -> Option<TagSubscription> {
let subscriptions = self.subscriptions.lock().await;
subscriptions
.iter()
.find(|sub| sub.tag_path == tag_name)
.cloned()
}
}