use opcua_types::{
match_extension_object_owned, DataChangeNotification, DataValue, EventNotificationList,
NotificationMessage, StatusChangeNotification, Variant,
};
use crate::{session::services::subscriptions::MonitoredItemMap, MonitoredItem};
pub trait OnSubscriptionNotificationCore: Send + Sync {
fn on_subscription_notification(
&mut self,
notification: NotificationMessage,
monitored_items: MonitoredItemMap<'_>,
);
}
impl<T> OnSubscriptionNotificationCore for T
where
T: OnSubscriptionNotification + Send + Sync,
{
fn on_subscription_notification(
&mut self,
notification: NotificationMessage,
monitored_items: MonitoredItemMap<'_>,
) {
let Some(notifications) = notification.notification_data else {
return;
};
for obj in notifications {
match_extension_object_owned!(obj,
v: DataChangeNotification => {
for notif in v.monitored_items.into_iter().flatten() {
let item = monitored_items.get(notif.client_handle);
if let Some(item) = item {
self.on_data_value(notif.value, item);
} else {
tracing::warn!("Received notification for unknown monitored item {}", notif.client_handle);
}
}
},
v: EventNotificationList => {
for notif in v.events.into_iter().flatten() {
let item = monitored_items.get(notif.client_handle);
if let Some(item) = item {
self.on_event(notif.event_fields, item);
}
}
},
v: StatusChangeNotification => {
self.on_subscription_status_change(v);
}
)
}
}
}
pub trait OnSubscriptionNotification: Send + Sync {
#[allow(unused)]
fn on_subscription_status_change(&mut self, notification: StatusChangeNotification) {}
#[allow(unused)]
fn on_data_value(&mut self, notification: DataValue, item: &MonitoredItem) {}
#[allow(unused)]
fn on_event(&mut self, event_fields: Option<Vec<Variant>>, item: &MonitoredItem) {}
}
type StatusChangeCallbackFun = dyn FnMut(StatusChangeNotification) + Send + Sync;
type DataChangeCallbackFun = dyn FnMut(DataValue, &MonitoredItem) + Send + Sync;
type EventCallbackFun = dyn FnMut(Option<Vec<Variant>>, &MonitoredItem) + Send + Sync;
pub struct SubscriptionCallbacks {
status_change: Box<StatusChangeCallbackFun>,
data_value: Box<DataChangeCallbackFun>,
event: Box<EventCallbackFun>,
}
impl SubscriptionCallbacks {
pub fn new(
status_change: impl FnMut(StatusChangeNotification) + Send + Sync + 'static,
data_value: impl FnMut(DataValue, &MonitoredItem) + Send + Sync + 'static,
event: impl FnMut(Option<Vec<Variant>>, &MonitoredItem) + Send + Sync + 'static,
) -> Self {
Self {
status_change: Box::new(status_change) as Box<StatusChangeCallbackFun>,
data_value: Box::new(data_value) as Box<DataChangeCallbackFun>,
event: Box::new(event) as Box<EventCallbackFun>,
}
}
}
impl OnSubscriptionNotification for SubscriptionCallbacks {
fn on_subscription_status_change(&mut self, notification: StatusChangeNotification) {
(self.status_change)(notification);
}
fn on_data_value(&mut self, notification: DataValue, item: &MonitoredItem) {
(self.data_value)(notification, item);
}
fn on_event(&mut self, event_fields: Option<Vec<Variant>>, item: &MonitoredItem) {
(self.event)(event_fields, item);
}
}
pub struct DataChangeCallback {
data_value: Box<DataChangeCallbackFun>,
}
impl DataChangeCallback {
pub fn new(data_value: impl FnMut(DataValue, &MonitoredItem) + Send + Sync + 'static) -> Self {
Self {
data_value: Box::new(data_value)
as Box<dyn FnMut(DataValue, &MonitoredItem) + Send + Sync>,
}
}
}
impl OnSubscriptionNotification for DataChangeCallback {
fn on_data_value(&mut self, notification: DataValue, item: &MonitoredItem) {
(self.data_value)(notification, item);
}
}
pub struct EventCallback {
event: Box<EventCallbackFun>,
}
impl EventCallback {
pub fn new(
event: impl FnMut(Option<Vec<Variant>>, &MonitoredItem) + Send + Sync + 'static,
) -> Self {
Self {
event: Box::new(event)
as Box<dyn FnMut(Option<Vec<Variant>>, &MonitoredItem) + Send + Sync>,
}
}
}
impl OnSubscriptionNotification for EventCallback {
fn on_event(&mut self, event_fields: Option<Vec<Variant>>, item: &MonitoredItem) {
(self.event)(event_fields, item);
}
}