Skip to main content

opcua_client/session/services/subscriptions/
mod.rs

1pub(crate) mod event_loop;
2pub use event_loop::SubscriptionActivity;
3
4mod callbacks;
5mod event_loop_state;
6mod service;
7pub(crate) mod state;
8
9pub use callbacks::{
10    DataChangeCallback, EventCallback, OnSubscriptionNotification, OnSubscriptionNotificationCore,
11    SubscriptionCallbacks,
12};
13use opcua_core::trace_lock;
14
15use std::{
16    collections::{BTreeSet, HashMap},
17    time::Duration,
18};
19
20use opcua_types::{
21    ExtensionObject, MonitoredItemCreateRequest, MonitoringMode, NotificationMessage, ReadValueId,
22};
23
24pub use service::{
25    CreateMonitoredItems, CreateSubscription, DeleteMonitoredItems, DeleteSubscriptions,
26    ModifyMonitoredItems, ModifySubscription, Publish, Republish, SetMonitoringMode,
27    SetPublishingMode, SetTriggering, TransferSubscriptions,
28};
29
30pub use event_loop_state::{SubscriptionCache, SubscriptionEventLoopState};
31
32use crate::session::services::subscriptions::{
33    service::CreatedMonitoredItem, state::SubscriptionState,
34};
35
36pub(crate) struct CreateMonitoredItem {
37    pub id: u32,
38    pub client_handle: u32,
39    pub item_to_monitor: ReadValueId,
40    pub monitoring_mode: MonitoringMode,
41    pub queue_size: u32,
42    pub discard_oldest: bool,
43    pub sampling_interval: f64,
44    pub filter: ExtensionObject,
45}
46
47pub(crate) struct ModifyMonitoredItem {
48    pub id: u32,
49    pub sampling_interval: f64,
50    pub queue_size: u32,
51}
52
53#[derive(Debug, Clone)]
54/// Client-side representation of a monitored item.
55pub struct MonitoredItem {
56    /// This is the monitored item's id within the subscription
57    id: u32,
58    /// Monitored item's handle. Used internally - not modifiable
59    client_handle: u32,
60    // The thing that is actually being monitored - the node id, attribute, index, encoding.
61    item_to_monitor: ReadValueId,
62    /// Queue size
63    queue_size: usize,
64    /// Monitoring mode
65    monitoring_mode: MonitoringMode,
66    /// Sampling interval
67    sampling_interval: f64,
68    /// Triggered items
69    triggered_items: BTreeSet<u32>,
70    /// Whether to discard oldest values on queue overflow
71    discard_oldest: bool,
72    /// Active filter
73    filter: ExtensionObject,
74}
75
76impl MonitoredItem {
77    /// Create a new monitored item.
78    pub fn new(client_handle: u32) -> MonitoredItem {
79        MonitoredItem {
80            id: 0,
81            client_handle,
82            item_to_monitor: ReadValueId::default(),
83            queue_size: 1,
84            monitoring_mode: MonitoringMode::Reporting,
85            sampling_interval: 0.0,
86            triggered_items: BTreeSet::new(),
87            discard_oldest: true,
88            filter: ExtensionObject::null(),
89        }
90    }
91
92    /// Server assigned ID of the monitored item.
93    pub fn id(&self) -> u32 {
94        self.id
95    }
96
97    /// Client assigned handle for the monitored item.
98    pub fn client_handle(&self) -> u32 {
99        self.client_handle
100    }
101
102    /// Attribute and node ID for the item the monitored item receives notifications for.
103    pub fn item_to_monitor(&self) -> &ReadValueId {
104        &self.item_to_monitor
105    }
106
107    /// Sampling interval.
108    pub fn sampling_interval(&self) -> f64 {
109        self.sampling_interval
110    }
111
112    /// Queue size on the server.
113    pub fn queue_size(&self) -> usize {
114        self.queue_size
115    }
116
117    /// Whether the oldest values are discarded on queue overflow on the server.
118    pub fn discard_oldest(&self) -> bool {
119        self.discard_oldest
120    }
121
122    pub(crate) fn set_sampling_interval(&mut self, value: f64) {
123        self.sampling_interval = value;
124    }
125
126    pub(crate) fn set_queue_size(&mut self, value: usize) {
127        self.queue_size = value;
128    }
129
130    pub(crate) fn set_monitoring_mode(&mut self, monitoring_mode: MonitoringMode) {
131        self.monitoring_mode = monitoring_mode;
132    }
133
134    pub(crate) fn set_triggering(&mut self, links_to_add: &[u32], links_to_remove: &[u32]) {
135        links_to_remove.iter().for_each(|i| {
136            self.triggered_items.remove(i);
137        });
138        links_to_add.iter().for_each(|i| {
139            self.triggered_items.insert(*i);
140        });
141    }
142
143    pub(crate) fn triggered_items(&self) -> &BTreeSet<u32> {
144        &self.triggered_items
145    }
146}
147
148#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
149enum MonitoredItemId {
150    Temporary(u32),
151    Server(u32),
152}
153
154/// Client-side representation of a subscription.
155pub struct Subscription {
156    /// Subscription id, supplied by server
157    subscription_id: u32,
158    /// Publishing interval in seconds
159    publishing_interval: Duration,
160    /// Lifetime count, revised by server
161    lifetime_count: u32,
162    /// Max keep alive count, revised by server
163    max_keep_alive_count: u32,
164    /// Max notifications per publish, revised by server
165    max_notifications_per_publish: u32,
166    /// Publishing enabled
167    publishing_enabled: bool,
168    /// Subscription priority
169    priority: u8,
170
171    /// A map of monitored items associated with the subscription (key = monitored_item_id)
172    monitored_items: HashMap<MonitoredItemId, MonitoredItem>,
173    /// A map of client handle to monitored item id
174    client_handles: HashMap<u32, MonitoredItemId>,
175
176    callback: Box<dyn OnSubscriptionNotificationCore>,
177}
178
179impl Subscription {
180    /// Creates a new subscription using the supplied parameters and the supplied data change callback.
181    #[allow(clippy::too_many_arguments)]
182    pub fn new(
183        subscription_id: u32,
184        publishing_interval: Duration,
185        lifetime_count: u32,
186        max_keep_alive_count: u32,
187        max_notifications_per_publish: u32,
188        priority: u8,
189        publishing_enabled: bool,
190        status_change_callback: Box<dyn OnSubscriptionNotificationCore>,
191    ) -> Subscription {
192        Subscription {
193            subscription_id,
194            publishing_interval,
195            lifetime_count,
196            max_keep_alive_count,
197            max_notifications_per_publish,
198            publishing_enabled,
199            priority,
200            monitored_items: HashMap::new(),
201            client_handles: HashMap::new(),
202            callback: status_change_callback,
203        }
204    }
205
206    /// Get the monitored items in this subscription.
207    pub fn monitored_items(&self) -> impl Iterator<Item = &MonitoredItem> {
208        self.monitored_items.values()
209    }
210
211    /// Get the subscription ID.
212    pub fn subscription_id(&self) -> u32 {
213        self.subscription_id
214    }
215
216    /// Get the configured publishing interval.
217    pub fn publishing_interval(&self) -> Duration {
218        self.publishing_interval
219    }
220
221    /// Get the `LifetimeCount` parameter for this subscription.
222    pub fn lifetime_count(&self) -> u32 {
223        self.lifetime_count
224    }
225
226    /// Get the configured priority.
227    pub fn priority(&self) -> u8 {
228        self.priority
229    }
230
231    /// Get the configured maximum keep alive count.
232    pub fn max_keep_alive_count(&self) -> u32 {
233        self.max_keep_alive_count
234    }
235
236    /// Get the configured maximum number of notifications per publish request.
237    pub fn max_notifications_per_publish(&self) -> u32 {
238        self.max_notifications_per_publish
239    }
240
241    /// Get whether publishing is enabled.
242    pub fn publishing_enabled(&self) -> bool {
243        self.publishing_enabled
244    }
245
246    /// Insert a monitored item that has been created on the server.
247    ///
248    /// If you call this yourself you are responsible for knowing that the
249    /// monitored item already exists.
250    pub fn insert_existing_monitored_item(&mut self, item: MonitoredItem) {
251        let client_handle = item.client_handle();
252        let monitored_item_id = item.id();
253        tracing::debug!(
254            "Inserting monitored item {} with client handle {}",
255            monitored_item_id,
256            client_handle
257        );
258        self.monitored_items
259            .insert(MonitoredItemId::Server(monitored_item_id), item);
260        self.client_handles
261            .insert(client_handle, MonitoredItemId::Server(monitored_item_id));
262    }
263
264    pub(crate) fn set_publishing_interval(&mut self, publishing_interval: Duration) {
265        self.publishing_interval = publishing_interval;
266    }
267
268    pub(crate) fn set_lifetime_count(&mut self, lifetime_count: u32) {
269        self.lifetime_count = lifetime_count;
270    }
271
272    pub(crate) fn set_max_keep_alive_count(&mut self, max_keep_alive_count: u32) {
273        self.max_keep_alive_count = max_keep_alive_count;
274    }
275
276    pub(crate) fn set_max_notifications_per_publish(&mut self, max_notifications_per_publish: u32) {
277        self.max_notifications_per_publish = max_notifications_per_publish;
278    }
279
280    pub(crate) fn set_publishing_enabled(&mut self, publishing_enabled: bool) {
281        self.publishing_enabled = publishing_enabled;
282    }
283
284    pub(crate) fn set_priority(&mut self, priority: u8) {
285        self.priority = priority;
286    }
287
288    pub(crate) fn insert_monitored_items(&mut self, items_to_create: Vec<CreateMonitoredItem>) {
289        items_to_create.into_iter().for_each(|i| {
290            let monitored_item = MonitoredItem {
291                id: i.id,
292                client_handle: i.client_handle,
293                item_to_monitor: i.item_to_monitor,
294                queue_size: i.queue_size as usize,
295                monitoring_mode: i.monitoring_mode,
296                sampling_interval: i.sampling_interval,
297                triggered_items: BTreeSet::new(),
298                discard_oldest: i.discard_oldest,
299                filter: i.filter,
300            };
301
302            self.insert_existing_monitored_item(monitored_item);
303        });
304    }
305
306    pub(crate) fn modify_monitored_items(&mut self, items_to_modify: &[ModifyMonitoredItem]) {
307        items_to_modify.iter().for_each(|i| {
308            if let Some(ref mut monitored_item) =
309                self.monitored_items.get_mut(&MonitoredItemId::Server(i.id))
310            {
311                monitored_item.set_sampling_interval(i.sampling_interval);
312                monitored_item.set_queue_size(i.queue_size as usize);
313            }
314        });
315    }
316
317    pub(crate) fn delete_monitored_items(&mut self, items_to_delete: &[u32]) {
318        items_to_delete.iter().for_each(|id| {
319            // Remove the monitored item and the client handle / id entry
320            if let Some(monitored_item) = self.monitored_items.remove(&MonitoredItemId::Server(*id))
321            {
322                let _ = self.client_handles.remove(&monitored_item.client_handle());
323            }
324        })
325    }
326
327    pub(crate) fn set_triggering(
328        &mut self,
329        triggering_item_id: u32,
330        links_to_add: &[u32],
331        links_to_remove: &[u32],
332    ) {
333        if let Some(ref mut monitored_item) = self
334            .monitored_items
335            .get_mut(&MonitoredItemId::Server(triggering_item_id))
336        {
337            monitored_item.set_triggering(links_to_add, links_to_remove);
338        }
339    }
340
341    pub(crate) fn on_notification(&mut self, notification: NotificationMessage) {
342        self.callback.on_subscription_notification(
343            notification,
344            MonitoredItemMap::new(&self.monitored_items, &self.client_handles),
345        );
346    }
347
348    fn clear_temporary_id(&mut self, temp_id: MonitoredItemId, remove_handle: bool) {
349        if let Some(monitored_item) = self.monitored_items.remove(&temp_id) {
350            if remove_handle {
351                let _ = self.client_handles.remove(&monitored_item.client_handle());
352            }
353        }
354    }
355
356    fn insert_temporary_monitored_item(&mut self, item: &TempMonitoredItem) {
357        let monitored_item = MonitoredItem {
358            id: 0,
359            client_handle: item.client_handle,
360            item_to_monitor: item.item_to_monitor.clone(),
361            queue_size: item.queue_size as usize,
362            monitoring_mode: item.monitoring_mode,
363            sampling_interval: item.sampling_interval,
364            triggered_items: BTreeSet::new(),
365            discard_oldest: item.discard_oldest,
366            filter: item.filter.clone(),
367        };
368
369        self.monitored_items
370            .insert(MonitoredItemId::Temporary(item.temp_id), monitored_item);
371        self.client_handles
372            .insert(item.client_handle, MonitoredItemId::Temporary(item.temp_id));
373    }
374}
375
376/// A map of monitored items associated with a subscription, allowing lookup by client handle.
377pub struct MonitoredItemMap<'a> {
378    /// A map of monitored items associated with the subscription (key = monitored_item_id)
379    monitored_items: &'a HashMap<MonitoredItemId, MonitoredItem>,
380    /// A map of client handle to monitored item id
381    client_handles: &'a HashMap<u32, MonitoredItemId>,
382}
383
384impl<'a> MonitoredItemMap<'a> {
385    fn new(
386        monitored_items: &'a HashMap<MonitoredItemId, MonitoredItem>,
387        client_handles: &'a HashMap<u32, MonitoredItemId>,
388    ) -> Self {
389        Self {
390            monitored_items,
391            client_handles,
392        }
393    }
394
395    /// Get a monitored item by its client handle.
396    pub fn get(&self, client_handle: u32) -> Option<&'a MonitoredItem> {
397        self.client_handles
398            .get(&client_handle)
399            .and_then(|id| self.monitored_items.get(id))
400    }
401}
402
403#[derive(Debug)]
404/// Limits for publish requests, calculated based on the number of subscriptions
405/// and the expected publish interval and message roundtrip time.
406pub struct PublishLimits {
407    message_roundtrip: Duration,
408    publish_interval: Duration,
409    subscriptions: usize,
410    min_publish_requests: usize,
411    max_publish_requests: usize,
412}
413
414impl PublishLimits {
415    const MIN_MESSAGE_ROUNDTRIP: Duration = Duration::from_millis(10);
416    const REQUESTS_PER_SUBSCRIPTION: usize = 2;
417
418    pub(crate) fn new() -> Self {
419        Self {
420            message_roundtrip: Self::MIN_MESSAGE_ROUNDTRIP,
421            publish_interval: Duration::ZERO,
422            subscriptions: 0,
423            min_publish_requests: 0,
424            max_publish_requests: 0,
425        }
426    }
427
428    pub(crate) fn update_message_roundtrip(&mut self, message_roundtrip: Duration) {
429        self.message_roundtrip = message_roundtrip.max(Self::MIN_MESSAGE_ROUNDTRIP);
430        self.calculate_publish_limits();
431    }
432
433    pub(crate) fn update_subscriptions(
434        &mut self,
435        subscriptions: usize,
436        publish_interval: Duration,
437    ) {
438        self.subscriptions = subscriptions;
439        self.publish_interval = publish_interval;
440        self.calculate_publish_limits();
441    }
442
443    fn calculate_publish_limits(&mut self) {
444        self.min_publish_requests = self.subscriptions * Self::REQUESTS_PER_SUBSCRIPTION;
445        self.max_publish_requests = (self.message_roundtrip.as_millis() as f32
446            / self.publish_interval.as_millis() as f32)
447            .ceil() as usize
448            * (self.min_publish_requests);
449    }
450}
451
452struct TempMonitoredItem {
453    temp_id: u32,
454    client_handle: u32,
455    item_to_monitor: ReadValueId,
456    queue_size: u32,
457    monitoring_mode: MonitoringMode,
458    sampling_interval: f64,
459    filter: ExtensionObject,
460    discard_oldest: bool,
461}
462
463/// A helper struct to manage insertion of monitored items into a subscription.
464/// This ensures that monitored items exist in the subscription state in a
465/// temporary state until the server has confirmed their creation.
466///
467/// This avoids race conditions where a monitored item gets notifications
468/// before it is stored in the subscription state,
469/// but also lets us avoid locking the subscription state while waiting
470/// for the server response.
471///
472/// To use, simple construct it with `new`, then call `finish`
473/// if the monitored items were created successfully. If the struct is
474/// dropped without calling `finish`, the temporary monitored items
475/// will be removed from the subscription state along with their handles.
476pub struct PreInsertMonitoredItems<'a> {
477    temp_ids: Vec<MonitoredItemTempResult>,
478    subscription_id: u32,
479    lock: &'a opcua_core::sync::Mutex<SubscriptionState>,
480}
481
482struct MonitoredItemTempResult {
483    temp_id: MonitoredItemId,
484    created: bool,
485}
486
487impl<'a> PreInsertMonitoredItems<'a> {
488    /// Create a new PreInsertMonitoredItems helper.
489    /// This inserts temporary monitored items into the subscription state.
490    pub fn new(
491        lock: &'a opcua_core::sync::Mutex<SubscriptionState>,
492        subscription_id: u32,
493        items: &[MonitoredItemCreateRequest],
494    ) -> Self {
495        let mut lck = trace_lock!(lock);
496
497        let to_insert: Vec<_> = items
498            .iter()
499            .map(|item| TempMonitoredItem {
500                temp_id: lck.next_temp_id(),
501                client_handle: item.requested_parameters.client_handle,
502                item_to_monitor: item.item_to_monitor.clone(),
503                queue_size: item.requested_parameters.queue_size,
504                monitoring_mode: item.monitoring_mode,
505                sampling_interval: item.requested_parameters.sampling_interval,
506                filter: item.requested_parameters.filter.clone(),
507                discard_oldest: item.requested_parameters.discard_oldest,
508            })
509            .collect();
510
511        let ids = to_insert
512            .iter()
513            .map(|i| MonitoredItemTempResult {
514                temp_id: MonitoredItemId::Temporary(i.temp_id),
515                created: false,
516            })
517            .collect();
518
519        lck.insert_temporary_monitored_items(&to_insert, subscription_id);
520        Self {
521            subscription_id,
522            temp_ids: ids,
523            lock,
524        }
525    }
526
527    /// Finish the monitored item creation.
528    /// This inserts the created monitored items into the subscription state.
529    pub fn finish(mut self, results: &[CreatedMonitoredItem]) {
530        let mut lck = trace_lock!(self.lock);
531        let mut items_to_create = Vec::with_capacity(results.len());
532        for (temp_id, item) in self.temp_ids.iter_mut().zip(results.iter()) {
533            if item.result.status_code.is_good() {
534                temp_id.created = true;
535                items_to_create.push(CreateMonitoredItem {
536                    id: item.result.monitored_item_id,
537                    client_handle: item.requested_parameters.client_handle,
538                    discard_oldest: item.requested_parameters.discard_oldest,
539                    item_to_monitor: item.item_to_monitor.clone(),
540                    monitoring_mode: item.monitoring_mode,
541                    queue_size: item.result.revised_queue_size,
542                    sampling_interval: item.result.revised_sampling_interval,
543                    filter: item.requested_parameters.filter.clone(),
544                });
545            }
546        }
547
548        lck.insert_monitored_items(self.subscription_id, items_to_create);
549    }
550}
551
552impl Drop for PreInsertMonitoredItems<'_> {
553    fn drop(&mut self) {
554        let mut lck = trace_lock!(self.lock);
555        lck.clear_temporary_ids(&self.temp_ids, self.subscription_id);
556    }
557}