Skip to main content

opcua_server/subscriptions/
notify.rs

1use hashbrown::HashMap;
2use opcua_nodes::Event;
3use opcua_types::{node_id::IntoNodeIdRef, AttributeId, DataValue, DateTime, ObjectId, Variant};
4use parking_lot::RwLockReadGuard;
5
6use crate::{
7    subscriptions::{MonitoredItemEntry, MonitoredItemKeyRef, SubscriptionCacheInner},
8    MonitoredItemHandle,
9};
10
11/// Handle for notifying the subscription cache of a batch of changes,
12/// without allocating NodeIds unnecessarily.
13/// Notifications are actually submitted once the notifier is dropped.
14pub struct SubscriptionDataNotifier<'a> {
15    lock: RwLockReadGuard<'a, SubscriptionCacheInner>,
16    by_subscription: HashMap<u32, Vec<(MonitoredItemHandle, DataValue)>>,
17}
18
19/// Notifier for a specific node.
20pub struct SubscriptionDataNotifierBatch<'a> {
21    items: &'a HashMap<MonitoredItemHandle, MonitoredItemEntry>,
22    by_subscription: &'a mut HashMap<u32, Vec<(MonitoredItemHandle, DataValue)>>,
23}
24
25impl<'a> SubscriptionDataNotifierBatch<'a> {
26    /// Notify the referenced node of a change in value by providing a DataValue.
27    pub fn data_value(&mut self, value: impl Into<DataValue>) {
28        let dv = value.into();
29        for (handle, entry) in self.items {
30            if !entry.enabled {
31                continue;
32            }
33            self.by_subscription
34                .entry(handle.subscription_id)
35                .or_default()
36                .push((*handle, dv.clone()));
37        }
38    }
39
40    /// Submit a data value to a specific monitored item.
41    pub fn data_value_to_item(
42        &mut self,
43        value: impl Into<DataValue>,
44        handle: &MonitoredItemHandle,
45    ) {
46        self.by_subscription
47            .entry(handle.subscription_id)
48            .or_default()
49            .push((*handle, value.into()));
50    }
51
52    /// Notify the referenced node of a change in value by providing a Variant and source timestamp.
53    pub fn value(&mut self, value: impl Into<Variant>, source_timestamp: DateTime) {
54        let dv = DataValue::new_at(value, source_timestamp);
55        self.data_value(dv);
56    }
57
58    /// Get an iterator over the matched monitored item entries. This can be used to
59    /// conditionally sample using parameters for each monitored item.
60    ///
61    /// This only returns monitored item entries that are enabled.
62    pub fn entries<'b>(
63        &'b self,
64    ) -> impl Iterator<Item = (&'a MonitoredItemHandle, &'a MonitoredItemEntry)> + 'a {
65        self.items.iter().filter(|e| e.1.enabled)
66    }
67}
68
69impl<'a> SubscriptionDataNotifier<'a> {
70    pub(super) fn new(lock: RwLockReadGuard<'a, SubscriptionCacheInner>) -> Self {
71        Self {
72            lock,
73            by_subscription: Default::default(),
74        }
75    }
76
77    /// Maybe sample for the given node ID and attribute ID.
78    ///
79    /// This allows you to only sample when a user is listening.
80    ///
81    /// # Example
82    ///
83    /// ```ignore
84    /// if let Some(mut notif) = notifier.notify_for((1, 123), AttributeId::Value) {
85    ///     notif.value(Variant::Int32(42), DateTime::now());
86    ///     notif.value(Variant::Int32(45), DateTime::now());
87    /// }
88    /// ```
89    pub fn notify_for<'b, 'c>(
90        &'b mut self,
91        node_id: impl IntoNodeIdRef<'c>,
92        attribute_id: AttributeId,
93    ) -> Option<SubscriptionDataNotifierBatch<'b>> {
94        if attribute_id == AttributeId::EventNotifier {
95            return None;
96        }
97
98        let items = self.lock.monitored_items.get(&MonitoredItemKeyRef {
99            id: node_id.into_node_id_ref(),
100            attribute_id,
101        })?;
102        Some(SubscriptionDataNotifierBatch {
103            items,
104            by_subscription: &mut self.by_subscription,
105        })
106    }
107
108    /// Notify the subscription cache of a change in value for the given node ID and attribute ID.
109    ///
110    /// # Arguments
111    ///
112    /// * `node_id` - The node ID or reference to node ID for the changed node.
113    /// * `attribute_id` - The attribute ID of the changed value. Note that this may not be EventNotifier.
114    /// * `value` - The new value as a DataValue or something convertible to a DataValue.
115    pub fn notify(
116        &mut self,
117        node_id: impl IntoNodeIdRef<'a>,
118        attribute_id: AttributeId,
119        value: impl Into<DataValue>,
120    ) {
121        if let Some(mut batch) = self.notify_for(node_id, attribute_id) {
122            batch.data_value(value);
123        }
124    }
125}
126
127impl<'a> Drop for SubscriptionDataNotifier<'a> {
128    fn drop(&mut self) {
129        for (sub_id, items) in std::mem::take(&mut self.by_subscription) {
130            let Some(session_id) = self.lock.subscription_to_session.get(&sub_id) else {
131                continue;
132            };
133            let Some(cache) = self.lock.session_subscriptions.get(session_id) else {
134                continue;
135            };
136            let mut cache_lck = cache.lock();
137            cache_lck.notify_data_changes(items);
138        }
139    }
140}
141
142/// Handle for notifying the subscription cache of a batch of events,
143/// without allocating NodeIds unnecessarily.
144/// Notifications are actually submitted once the notifier is dropped.
145pub struct SubscriptionEventNotifier<'a, 'b> {
146    lock: RwLockReadGuard<'a, SubscriptionCacheInner>,
147    by_subscription: HashMap<u32, Vec<(MonitoredItemHandle, &'b dyn Event)>>,
148}
149
150/// Notifier for a specific node emitting events.
151pub struct SubscriptionEventNotifierBatch<'a, 'b> {
152    // An event may notify on both the server, and an emitting node.
153    // So we may in some cases need two maps of monitored item entries.
154    items: &'a HashMap<MonitoredItemHandle, MonitoredItemEntry>,
155    items_2: Option<&'a HashMap<MonitoredItemHandle, MonitoredItemEntry>>,
156    by_subscription: &'a mut HashMap<u32, Vec<(MonitoredItemHandle, &'b dyn Event)>>,
157}
158
159impl<'a, 'b> SubscriptionEventNotifierBatch<'a, 'b> {
160    /// Notify the referenced node of a new event.
161    pub fn event(&mut self, event: &'b dyn Event) {
162        for (handle, entry) in self
163            .items
164            .iter()
165            .chain(self.items_2.iter().flat_map(|v| v.iter()))
166        {
167            if !entry.enabled {
168                continue;
169            }
170            self.by_subscription
171                .entry(handle.subscription_id)
172                .or_default()
173                .push((*handle, event));
174        }
175    }
176}
177
178impl<'a, 'b> SubscriptionEventNotifier<'a, 'b> {
179    pub(super) fn new(lock: RwLockReadGuard<'a, SubscriptionCacheInner>) -> Self {
180        Self {
181            lock,
182            by_subscription: Default::default(),
183        }
184    }
185
186    /// Maybe get a notifier for the given node ID and attribute ID.
187    ///
188    /// This allows you to only sample when a user is listening.
189    ///
190    /// # Example
191    ///
192    /// ```ignore
193    /// if let Some(mut notif) = notifier.notify_for((1, 123)) {
194    ///     notif.event(&my_event);
195    ///     notif.event(&my_other_event);
196    /// }
197    /// ```
198    pub fn notify_for<'c>(
199        &'c mut self,
200        node_id: impl IntoNodeIdRef<'a>,
201    ) -> Option<SubscriptionEventNotifierBatch<'c, 'b>> {
202        let id_ref = node_id.into_node_id_ref();
203        let is_server = id_ref == ObjectId::Server;
204        let items = self.lock.monitored_items.get(&MonitoredItemKeyRef {
205            id: id_ref,
206            attribute_id: AttributeId::EventNotifier,
207        });
208        let server_items = if !is_server {
209            self.lock.monitored_items.get(&MonitoredItemKeyRef {
210                id: ObjectId::Server.into_node_id_ref(),
211                attribute_id: AttributeId::EventNotifier,
212            })
213        } else {
214            None
215        };
216
217        let (items, items_2) = match (items, server_items) {
218            (None, Some(v)) | (Some(v), None) => (v, None),
219            (Some(v), Some(v2)) => (v, Some(v2)),
220            (None, None) => return None,
221        };
222
223        Some(SubscriptionEventNotifierBatch {
224            items,
225            items_2,
226            by_subscription: &mut self.by_subscription,
227        })
228    }
229
230    /// Notify the subscription cache of a new event for the given node ID and attribute ID.
231    ///
232    /// # Arguments
233    ///
234    /// * `node_id` - The node ID or reference to node ID for the node emitting the event.
235    /// * `event` - The event to notify the server of.
236    pub fn notify(&mut self, node_id: impl IntoNodeIdRef<'a>, event: &'b dyn Event) {
237        if let Some(mut batch) = self.notify_for(node_id) {
238            batch.event(event);
239        }
240    }
241}
242
243impl<'a, 'b> Drop for SubscriptionEventNotifier<'a, 'b> {
244    fn drop(&mut self) {
245        for (sub_id, items) in std::mem::take(&mut self.by_subscription) {
246            let Some(session_id) = self.lock.subscription_to_session.get(&sub_id) else {
247                continue;
248            };
249            let Some(cache) = self.lock.session_subscriptions.get(session_id) else {
250                continue;
251            };
252            let mut cache_lck = cache.lock();
253            cache_lck.notify_events(items);
254        }
255    }
256}