opcua_client/session/services/subscriptions/
callbacks.rs1use opcua_types::{
2 match_extension_object_owned, DataChangeNotification, DataValue, EventNotificationList,
3 NotificationMessage, StatusChangeNotification, Variant,
4};
5
6use crate::{session::services::subscriptions::MonitoredItemMap, MonitoredItem};
7
8pub trait OnSubscriptionNotificationCore: Send + Sync {
12 fn on_subscription_notification(
14 &mut self,
15 notification: NotificationMessage,
16 monitored_items: MonitoredItemMap<'_>,
17 );
18}
19
20impl<T> OnSubscriptionNotificationCore for T
21where
22 T: OnSubscriptionNotification + Send + Sync,
23{
24 fn on_subscription_notification(
25 &mut self,
26 notification: NotificationMessage,
27 monitored_items: MonitoredItemMap<'_>,
28 ) {
29 let Some(notifications) = notification.notification_data else {
30 return;
31 };
32
33 for obj in notifications {
34 match_extension_object_owned!(obj,
35 v: DataChangeNotification => {
36 for notif in v.monitored_items.into_iter().flatten() {
37 let item = monitored_items.get(notif.client_handle);
38
39 if let Some(item) = item {
40 self.on_data_value(notif.value, item);
41 } else {
42 tracing::warn!("Received notification for unknown monitored item {}", notif.client_handle);
43 }
44 }
45 },
46 v: EventNotificationList => {
47 for notif in v.events.into_iter().flatten() {
48 let item = monitored_items.get(notif.client_handle);
49
50 if let Some(item) = item {
51 self.on_event(notif.event_fields, item);
52 }
53 }
54 },
55 v: StatusChangeNotification => {
56 self.on_subscription_status_change(v);
57 }
58 )
59 }
60 }
61}
62
63pub trait OnSubscriptionNotification: Send + Sync {
67 #[allow(unused)]
69 fn on_subscription_status_change(&mut self, notification: StatusChangeNotification) {}
70
71 #[allow(unused)]
73 fn on_data_value(&mut self, notification: DataValue, item: &MonitoredItem) {}
74
75 #[allow(unused)]
77 fn on_event(&mut self, event_fields: Option<Vec<Variant>>, item: &MonitoredItem) {}
78}
79
80type StatusChangeCallbackFun = dyn FnMut(StatusChangeNotification) + Send + Sync;
81type DataChangeCallbackFun = dyn FnMut(DataValue, &MonitoredItem) + Send + Sync;
82type EventCallbackFun = dyn FnMut(Option<Vec<Variant>>, &MonitoredItem) + Send + Sync;
83
84pub struct SubscriptionCallbacks {
86 status_change: Box<StatusChangeCallbackFun>,
87 data_value: Box<DataChangeCallbackFun>,
88 event: Box<EventCallbackFun>,
89}
90
91impl SubscriptionCallbacks {
92 pub fn new(
100 status_change: impl FnMut(StatusChangeNotification) + Send + Sync + 'static,
101 data_value: impl FnMut(DataValue, &MonitoredItem) + Send + Sync + 'static,
102 event: impl FnMut(Option<Vec<Variant>>, &MonitoredItem) + Send + Sync + 'static,
103 ) -> Self {
104 Self {
105 status_change: Box::new(status_change) as Box<StatusChangeCallbackFun>,
106 data_value: Box::new(data_value) as Box<DataChangeCallbackFun>,
107 event: Box::new(event) as Box<EventCallbackFun>,
108 }
109 }
110}
111
112impl OnSubscriptionNotification for SubscriptionCallbacks {
113 fn on_subscription_status_change(&mut self, notification: StatusChangeNotification) {
114 (self.status_change)(notification);
115 }
116
117 fn on_data_value(&mut self, notification: DataValue, item: &MonitoredItem) {
118 (self.data_value)(notification, item);
119 }
120
121 fn on_event(&mut self, event_fields: Option<Vec<Variant>>, item: &MonitoredItem) {
122 (self.event)(event_fields, item);
123 }
124}
125
126pub struct DataChangeCallback {
128 data_value: Box<DataChangeCallbackFun>,
129}
130
131impl DataChangeCallback {
132 pub fn new(data_value: impl FnMut(DataValue, &MonitoredItem) + Send + Sync + 'static) -> Self {
138 Self {
139 data_value: Box::new(data_value)
140 as Box<dyn FnMut(DataValue, &MonitoredItem) + Send + Sync>,
141 }
142 }
143}
144
145impl OnSubscriptionNotification for DataChangeCallback {
146 fn on_data_value(&mut self, notification: DataValue, item: &MonitoredItem) {
147 (self.data_value)(notification, item);
148 }
149}
150
151pub struct EventCallback {
153 event: Box<EventCallbackFun>,
154}
155
156impl EventCallback {
157 pub fn new(
163 event: impl FnMut(Option<Vec<Variant>>, &MonitoredItem) + Send + Sync + 'static,
164 ) -> Self {
165 Self {
166 event: Box::new(event)
167 as Box<dyn FnMut(Option<Vec<Variant>>, &MonitoredItem) + Send + Sync>,
168 }
169 }
170}
171
172impl OnSubscriptionNotification for EventCallback {
173 fn on_event(&mut self, event_fields: Option<Vec<Variant>>, item: &MonitoredItem) {
174 (self.event)(event_fields, item);
175 }
176}