Skip to main content

opcua_client/session/services/subscriptions/
callbacks.rs

1use opcua_types::{
2    match_extension_object_owned, DataChangeNotification, DataValue, EventNotificationList,
3    NotificationMessage, StatusChangeNotification, Variant,
4};
5
6use crate::{session::services::subscriptions::MonitoredItemMap, MonitoredItem};
7
8/// A trait for handling subscription notifications.
9/// Typically, you will want to use OnSubscriptionNotification instead,
10/// which has a blanket implementation for this trait.
11pub trait OnSubscriptionNotificationCore: Send + Sync {
12    /// Called when a notification is received on a subscription.
13    fn on_subscription_notification(
14        &mut self,
15        notification: NotificationMessage,
16        monitored_items: MonitoredItemMap<'_>,
17    );
18}
19
20impl<T> OnSubscriptionNotificationCore for T
21where
22    T: OnSubscriptionNotification + Send + Sync,
23{
24    fn on_subscription_notification(
25        &mut self,
26        notification: NotificationMessage,
27        monitored_items: MonitoredItemMap<'_>,
28    ) {
29        let Some(notifications) = notification.notification_data else {
30            return;
31        };
32
33        for obj in notifications {
34            match_extension_object_owned!(obj,
35                v: DataChangeNotification => {
36                    for notif in v.monitored_items.into_iter().flatten() {
37                        let item = monitored_items.get(notif.client_handle);
38
39                        if let Some(item) = item {
40                            self.on_data_value(notif.value, item);
41                        } else {
42                            tracing::warn!("Received notification for unknown monitored item {}", notif.client_handle);
43                        }
44                    }
45                },
46                v: EventNotificationList => {
47                    for notif in v.events.into_iter().flatten() {
48                        let item = monitored_items.get(notif.client_handle);
49
50                        if let Some(item) = item {
51                            self.on_event(notif.event_fields, item);
52                        }
53                    }
54                },
55                v: StatusChangeNotification => {
56                    self.on_subscription_status_change(v);
57                }
58            )
59        }
60    }
61}
62
63/// A set of callbacks for notifications on a subscription.
64/// You may implement this on your own struct, or simply use [SubscriptionCallbacks]
65/// for a simple collection of closures.
66pub trait OnSubscriptionNotification: Send + Sync {
67    /// Called when a subscription changes state on the server.
68    #[allow(unused)]
69    fn on_subscription_status_change(&mut self, notification: StatusChangeNotification) {}
70
71    /// Called for each data value change.
72    #[allow(unused)]
73    fn on_data_value(&mut self, notification: DataValue, item: &MonitoredItem) {}
74
75    /// Called for each received event.
76    #[allow(unused)]
77    fn on_event(&mut self, event_fields: Option<Vec<Variant>>, item: &MonitoredItem) {}
78}
79
80type StatusChangeCallbackFun = dyn FnMut(StatusChangeNotification) + Send + Sync;
81type DataChangeCallbackFun = dyn FnMut(DataValue, &MonitoredItem) + Send + Sync;
82type EventCallbackFun = dyn FnMut(Option<Vec<Variant>>, &MonitoredItem) + Send + Sync;
83
84/// A convenient wrapper around a set of callback functions that implements [OnSubscriptionNotification]
85pub struct SubscriptionCallbacks {
86    status_change: Box<StatusChangeCallbackFun>,
87    data_value: Box<DataChangeCallbackFun>,
88    event: Box<EventCallbackFun>,
89}
90
91impl SubscriptionCallbacks {
92    /// Create a new subscription callback wrapper.
93    ///
94    /// # Arguments
95    ///
96    /// * `status_change` - Called when a subscription changes state on the server.
97    /// * `data_value` - Called for each received data value.
98    /// * `event` - Called for each received event.
99    pub fn new(
100        status_change: impl FnMut(StatusChangeNotification) + Send + Sync + 'static,
101        data_value: impl FnMut(DataValue, &MonitoredItem) + Send + Sync + 'static,
102        event: impl FnMut(Option<Vec<Variant>>, &MonitoredItem) + Send + Sync + 'static,
103    ) -> Self {
104        Self {
105            status_change: Box::new(status_change) as Box<StatusChangeCallbackFun>,
106            data_value: Box::new(data_value) as Box<DataChangeCallbackFun>,
107            event: Box::new(event) as Box<EventCallbackFun>,
108        }
109    }
110}
111
112impl OnSubscriptionNotification for SubscriptionCallbacks {
113    fn on_subscription_status_change(&mut self, notification: StatusChangeNotification) {
114        (self.status_change)(notification);
115    }
116
117    fn on_data_value(&mut self, notification: DataValue, item: &MonitoredItem) {
118        (self.data_value)(notification, item);
119    }
120
121    fn on_event(&mut self, event_fields: Option<Vec<Variant>>, item: &MonitoredItem) {
122        (self.event)(event_fields, item);
123    }
124}
125
126/// A wrapper around a data change callback that implements [OnSubscriptionNotification]
127pub struct DataChangeCallback {
128    data_value: Box<DataChangeCallbackFun>,
129}
130
131impl DataChangeCallback {
132    /// Create a new data change callback wrapper.
133    ///
134    /// # Arguments
135    ///
136    /// * `data_value` - Called for each received data value.
137    pub fn new(data_value: impl FnMut(DataValue, &MonitoredItem) + Send + Sync + 'static) -> Self {
138        Self {
139            data_value: Box::new(data_value)
140                as Box<dyn FnMut(DataValue, &MonitoredItem) + Send + Sync>,
141        }
142    }
143}
144
145impl OnSubscriptionNotification for DataChangeCallback {
146    fn on_data_value(&mut self, notification: DataValue, item: &MonitoredItem) {
147        (self.data_value)(notification, item);
148    }
149}
150
151/// A wrapper around an event callback that implements [OnSubscriptionNotification]
152pub struct EventCallback {
153    event: Box<EventCallbackFun>,
154}
155
156impl EventCallback {
157    /// Create a new event callback wrapper.
158    ///
159    /// # Arguments
160    ///
161    /// * `data_value` - Called for each received data value.
162    pub fn new(
163        event: impl FnMut(Option<Vec<Variant>>, &MonitoredItem) + Send + Sync + 'static,
164    ) -> Self {
165        Self {
166            event: Box::new(event)
167                as Box<dyn FnMut(Option<Vec<Variant>>, &MonitoredItem) + Send + Sync>,
168        }
169    }
170}
171
172impl OnSubscriptionNotification for EventCallback {
173    fn on_event(&mut self, event_fields: Option<Vec<Variant>>, item: &MonitoredItem) {
174        (self.event)(event_fields, item);
175    }
176}