opcua_server/subscriptions/
notify.rs1use hashbrown::HashMap;
2use opcua_nodes::Event;
3use opcua_types::{node_id::IntoNodeIdRef, AttributeId, DataValue, DateTime, ObjectId, Variant};
4use parking_lot::RwLockReadGuard;
5
6use crate::{
7 subscriptions::{MonitoredItemEntry, MonitoredItemKeyRef, SubscriptionCacheInner},
8 MonitoredItemHandle,
9};
10
11pub struct SubscriptionDataNotifier<'a> {
15 lock: RwLockReadGuard<'a, SubscriptionCacheInner>,
16 by_subscription: HashMap<u32, Vec<(MonitoredItemHandle, DataValue)>>,
17}
18
19pub struct SubscriptionDataNotifierBatch<'a> {
21 items: &'a HashMap<MonitoredItemHandle, MonitoredItemEntry>,
22 by_subscription: &'a mut HashMap<u32, Vec<(MonitoredItemHandle, DataValue)>>,
23}
24
25impl<'a> SubscriptionDataNotifierBatch<'a> {
26 pub fn data_value(&mut self, value: impl Into<DataValue>) {
28 let dv = value.into();
29 for (handle, entry) in self.items {
30 if !entry.enabled {
31 continue;
32 }
33 self.by_subscription
34 .entry(handle.subscription_id)
35 .or_default()
36 .push((*handle, dv.clone()));
37 }
38 }
39
40 pub fn data_value_to_item(
42 &mut self,
43 value: impl Into<DataValue>,
44 handle: &MonitoredItemHandle,
45 ) {
46 self.by_subscription
47 .entry(handle.subscription_id)
48 .or_default()
49 .push((*handle, value.into()));
50 }
51
52 pub fn value(&mut self, value: impl Into<Variant>, source_timestamp: DateTime) {
54 let dv = DataValue::new_at(value, source_timestamp);
55 self.data_value(dv);
56 }
57
58 pub fn entries<'b>(
63 &'b self,
64 ) -> impl Iterator<Item = (&'a MonitoredItemHandle, &'a MonitoredItemEntry)> + 'a {
65 self.items.iter().filter(|e| e.1.enabled)
66 }
67}
68
69impl<'a> SubscriptionDataNotifier<'a> {
70 pub(super) fn new(lock: RwLockReadGuard<'a, SubscriptionCacheInner>) -> Self {
71 Self {
72 lock,
73 by_subscription: Default::default(),
74 }
75 }
76
77 pub fn notify_for<'b, 'c>(
90 &'b mut self,
91 node_id: impl IntoNodeIdRef<'c>,
92 attribute_id: AttributeId,
93 ) -> Option<SubscriptionDataNotifierBatch<'b>> {
94 if attribute_id == AttributeId::EventNotifier {
95 return None;
96 }
97
98 let items = self.lock.monitored_items.get(&MonitoredItemKeyRef {
99 id: node_id.into_node_id_ref(),
100 attribute_id,
101 })?;
102 Some(SubscriptionDataNotifierBatch {
103 items,
104 by_subscription: &mut self.by_subscription,
105 })
106 }
107
108 pub fn notify(
116 &mut self,
117 node_id: impl IntoNodeIdRef<'a>,
118 attribute_id: AttributeId,
119 value: impl Into<DataValue>,
120 ) {
121 if let Some(mut batch) = self.notify_for(node_id, attribute_id) {
122 batch.data_value(value);
123 }
124 }
125}
126
127impl<'a> Drop for SubscriptionDataNotifier<'a> {
128 fn drop(&mut self) {
129 for (sub_id, items) in std::mem::take(&mut self.by_subscription) {
130 let Some(session_id) = self.lock.subscription_to_session.get(&sub_id) else {
131 continue;
132 };
133 let Some(cache) = self.lock.session_subscriptions.get(session_id) else {
134 continue;
135 };
136 let mut cache_lck = cache.lock();
137 cache_lck.notify_data_changes(items);
138 }
139 }
140}
141
142pub struct SubscriptionEventNotifier<'a, 'b> {
146 lock: RwLockReadGuard<'a, SubscriptionCacheInner>,
147 by_subscription: HashMap<u32, Vec<(MonitoredItemHandle, &'b dyn Event)>>,
148}
149
150pub struct SubscriptionEventNotifierBatch<'a, 'b> {
152 items: &'a HashMap<MonitoredItemHandle, MonitoredItemEntry>,
155 items_2: Option<&'a HashMap<MonitoredItemHandle, MonitoredItemEntry>>,
156 by_subscription: &'a mut HashMap<u32, Vec<(MonitoredItemHandle, &'b dyn Event)>>,
157}
158
159impl<'a, 'b> SubscriptionEventNotifierBatch<'a, 'b> {
160 pub fn event(&mut self, event: &'b dyn Event) {
162 for (handle, entry) in self
163 .items
164 .iter()
165 .chain(self.items_2.iter().flat_map(|v| v.iter()))
166 {
167 if !entry.enabled {
168 continue;
169 }
170 self.by_subscription
171 .entry(handle.subscription_id)
172 .or_default()
173 .push((*handle, event));
174 }
175 }
176}
177
178impl<'a, 'b> SubscriptionEventNotifier<'a, 'b> {
179 pub(super) fn new(lock: RwLockReadGuard<'a, SubscriptionCacheInner>) -> Self {
180 Self {
181 lock,
182 by_subscription: Default::default(),
183 }
184 }
185
186 pub fn notify_for<'c>(
199 &'c mut self,
200 node_id: impl IntoNodeIdRef<'a>,
201 ) -> Option<SubscriptionEventNotifierBatch<'c, 'b>> {
202 let id_ref = node_id.into_node_id_ref();
203 let is_server = id_ref == ObjectId::Server;
204 let items = self.lock.monitored_items.get(&MonitoredItemKeyRef {
205 id: id_ref,
206 attribute_id: AttributeId::EventNotifier,
207 });
208 let server_items = if !is_server {
209 self.lock.monitored_items.get(&MonitoredItemKeyRef {
210 id: ObjectId::Server.into_node_id_ref(),
211 attribute_id: AttributeId::EventNotifier,
212 })
213 } else {
214 None
215 };
216
217 let (items, items_2) = match (items, server_items) {
218 (None, Some(v)) | (Some(v), None) => (v, None),
219 (Some(v), Some(v2)) => (v, Some(v2)),
220 (None, None) => return None,
221 };
222
223 Some(SubscriptionEventNotifierBatch {
224 items,
225 items_2,
226 by_subscription: &mut self.by_subscription,
227 })
228 }
229
230 pub fn notify(&mut self, node_id: impl IntoNodeIdRef<'a>, event: &'b dyn Event) {
237 if let Some(mut batch) = self.notify_for(node_id) {
238 batch.event(event);
239 }
240 }
241}
242
243impl<'a, 'b> Drop for SubscriptionEventNotifier<'a, 'b> {
244 fn drop(&mut self) {
245 for (sub_id, items) in std::mem::take(&mut self.by_subscription) {
246 let Some(session_id) = self.lock.subscription_to_session.get(&sub_id) else {
247 continue;
248 };
249 let Some(cache) = self.lock.session_subscriptions.get(session_id) else {
250 continue;
251 };
252 let mut cache_lck = cache.lock();
253 cache_lck.notify_events(items);
254 }
255 }
256}