Skip to main content

opcua_server/subscriptions/
subscription.rs

1use std::{
2    collections::{HashMap, HashSet, VecDeque},
3    time::{Duration, Instant},
4};
5
6use opcua_core::handle::Handle;
7use opcua_nodes::{Event, TypeTree};
8use opcua_types::{DataValue, DateTime, DateTimeUtc, NotificationMessage, StatusCode};
9use tracing::{debug, trace, warn};
10
11use super::monitored_item::{MonitoredItem, Notification};
12
13#[derive(Debug, Copy, Clone, PartialEq)]
14/// Current internal state of the subscription.
15pub enum SubscriptionState {
16    /// The subscription has been closed and will be removed soon.
17    Closed,
18    /// The subscription is being created.
19    Creating,
20    /// The subscription is operating normally.
21    Normal,
22    /// The subscription is waiting for publish requests that are
23    /// not arriving as expected.
24    Late,
25    /// The subscription is sending keep alives because no
26    /// data is being produced.
27    KeepAlive,
28}
29
30#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
31/// Unique identifier for a monitored item on the server.
32pub struct MonitoredItemHandle {
33    /// Subscription this monitored item belongs to.
34    pub subscription_id: u32,
35    /// ID of this monitored item.
36    pub monitored_item_id: u32,
37}
38
39#[derive(Debug)]
40pub(crate) struct SubscriptionStateParams {
41    pub notifications_available: bool,
42    pub more_notifications: bool,
43    pub publishing_req_queued: bool,
44}
45
46#[derive(Debug, Copy, Clone, PartialEq, Eq)]
47pub(crate) enum UpdateStateAction {
48    None,
49    // Return a keep alive
50    ReturnKeepAlive,
51    // Return notifications
52    ReturnNotifications,
53    // The subscription was created normally
54    SubscriptionCreated,
55    // The subscription has expired and must be closed
56    SubscriptionExpired,
57}
58
59#[derive(Debug, Copy, Clone, PartialEq, Eq)]
60pub(super) enum TickResult {
61    Expired,
62    Enqueued,
63    None,
64}
65
66/// This is for debugging purposes. It allows the caller to validate the output state if required.
67///
68/// Values correspond to state table in OPC UA Part 4 5.13.1.2
69///
70#[derive(Debug, Copy, Clone, PartialEq)]
71pub(crate) enum HandledState {
72    None0 = 0,
73    Create3 = 3,
74    Normal4 = 4,
75    Normal5 = 5,
76    IntervalElapsed6 = 6,
77    IntervalElapsed7 = 7,
78    IntervalElapsed8 = 8,
79    IntervalElapsed9 = 9,
80    Late10 = 10,
81    Late11 = 11,
82    Late12 = 12,
83    KeepAlive13 = 13,
84    KeepAlive14 = 14,
85    KeepAlive15 = 15,
86    KeepAlive16 = 16,
87    KeepAlive17 = 17,
88    Closed27 = 27,
89}
90
91#[derive(Debug)]
92/// A single subscription maintained by the server.
93pub struct Subscription {
94    id: u32,
95    publishing_interval: Duration,
96    max_lifetime_counter: u32,
97    max_keep_alive_counter: u32,
98    priority: u8,
99    monitored_items: HashMap<u32, MonitoredItem>,
100    /// Monitored items that have seen notifications.
101    notified_monitored_items: HashSet<u32>,
102    /// State of the subscription
103    state: SubscriptionState,
104    /// A value that contains the number of consecutive publishing timer expirations without Client
105    /// activity before the Subscription is terminated.
106    lifetime_counter: u32,
107    /// Keep alive counter decrements when there are no notifications to publish and when it expires
108    /// requests to send an empty notification as a keep alive event
109    keep_alive_counter: u32,
110    /// boolean value that is set to true to mean that either a NotificationMessage or a keep-alive
111    /// Message has been sent on the Subscription. It is a flag that is used to ensure that either
112    /// a NotificationMessage or a keep-alive Message is sent out the first time the publishing timer
113    /// expires.
114    first_message_sent: bool,
115    /// The parameter that requests publishing to be enabled or disabled.
116    publishing_enabled: bool,
117    /// A flag that tells the subscription to send the latest value of every monitored item on the
118    /// next publish request.
119    resend_data: bool,
120    /// The next sequence number to be sent
121    sequence_number: Handle,
122    // The time that the subscription interval last fired
123    last_time_publishing_interval_elapsed: Instant,
124    // Currently outstanding notifications to send
125    notifications: VecDeque<NotificationMessage>,
126    /// Maximum number of queued notifications.
127    max_queued_notifications: usize,
128    /// Maximum number of notifications per publish.
129    max_notifications_per_publish: usize,
130}
131
132#[derive(Debug, Copy, Clone, PartialEq)]
133pub(crate) enum TickReason {
134    ReceivePublishRequest,
135    TickTimerFired,
136}
137
138impl Subscription {
139    #[allow(clippy::too_many_arguments)]
140    pub(super) fn new(
141        id: u32,
142        publishing_enabled: bool,
143        publishing_interval: Duration,
144        lifetime_counter: u32,
145        keep_alive_counter: u32,
146        priority: u8,
147        max_queued_notifications: usize,
148        max_notifications_per_publish: u64,
149    ) -> Self {
150        Self {
151            id,
152            publishing_interval,
153            max_lifetime_counter: lifetime_counter,
154            max_keep_alive_counter: keep_alive_counter,
155            priority,
156            monitored_items: HashMap::new(),
157            notified_monitored_items: HashSet::new(),
158            // State variables
159            state: SubscriptionState::Creating,
160            lifetime_counter,
161            keep_alive_counter,
162            first_message_sent: false,
163            resend_data: false,
164            publishing_enabled,
165            // Counters for new items
166            sequence_number: Handle::new(1),
167            last_time_publishing_interval_elapsed: Instant::now(),
168            notifications: VecDeque::new(),
169            max_queued_notifications,
170            max_notifications_per_publish: max_notifications_per_publish as usize,
171        }
172    }
173
174    /// Get the number of monitored items in this subscription.
175    pub fn len(&self) -> usize {
176        self.monitored_items.len()
177    }
178
179    /// Return whether the subscription has no monitored items.
180    pub fn is_empty(&self) -> bool {
181        self.monitored_items.is_empty()
182    }
183
184    pub(super) fn get_mut(&mut self, id: &u32) -> Option<&mut MonitoredItem> {
185        self.monitored_items.get_mut(id)
186    }
187
188    /// Get a reference to a monitored item managed by this subscription.
189    pub fn get(&self, id: &u32) -> Option<&MonitoredItem> {
190        self.monitored_items.get(id)
191    }
192
193    /// Return whether the subscription contains the given monitored item ID.
194    pub fn contains_key(&self, id: &u32) -> bool {
195        self.monitored_items.contains_key(id)
196    }
197
198    /// Iterate over the monitored items in the subscription.
199    pub fn items(&self) -> impl Iterator<Item = &MonitoredItem> {
200        self.monitored_items.values()
201    }
202
203    pub(super) fn drain(&mut self) -> impl Iterator<Item = (u32, MonitoredItem)> + '_ {
204        self.monitored_items.drain()
205    }
206
207    /// Set `resend_data`. The next publish request will send values for all
208    /// monitored items, whether or not they have produced any new data.
209    pub fn set_resend_data(&mut self) {
210        self.resend_data = true;
211    }
212
213    pub(super) fn remove(&mut self, id: &u32) -> Option<MonitoredItem> {
214        self.monitored_items.remove(id)
215    }
216
217    pub(super) fn insert(&mut self, id: u32, item: MonitoredItem) {
218        self.monitored_items.insert(id, item);
219        self.notified_monitored_items.insert(id);
220    }
221
222    /// Notify the given monitored item of a new data value.
223    pub fn notify_data_value(&mut self, id: &u32, value: DataValue, now: &DateTime) {
224        if let Some(item) = self.monitored_items.get_mut(id) {
225            if item.notify_data_value(value, now, false) {
226                self.notified_monitored_items.insert(*id);
227            }
228        }
229    }
230
231    /// Notify the given monitored item of a new event.
232    pub fn notify_event(&mut self, id: &u32, event: &dyn Event, type_tree: &dyn TypeTree) {
233        if let Some(item) = self.monitored_items.get_mut(id) {
234            if item.notify_event(event, type_tree) {
235                self.notified_monitored_items.insert(*id);
236            }
237        }
238    }
239
240    /// Tests if the publishing interval has elapsed since the last time this function in which case
241    /// it returns `true` and updates its internal state.
242    fn test_and_set_publishing_interval_elapsed(&mut self, now: Instant) -> bool {
243        // Look at the last expiration time compared to now and see if it matches
244        // or exceeds the publishing interval
245        let elapsed = now - self.last_time_publishing_interval_elapsed;
246        if elapsed >= self.publishing_interval {
247            self.last_time_publishing_interval_elapsed = now;
248            true
249        } else {
250            false
251        }
252    }
253
254    fn get_state_transition(
255        &self,
256        tick_reason: TickReason,
257        p: SubscriptionStateParams,
258    ) -> HandledState {
259        // The full state transition table from Part 4 5.13.1.
260        // Note that the exact layout here is written to be as close as possible to the state transition
261        // table. Avoid changing it to clean it up or remove redundant checks. To make it easier to debug,
262        // it should be as one-to-one with the original document as possible.
263        #[allow(clippy::nonminimal_bool)]
264        match (self.state, tick_reason) {
265            (SubscriptionState::Creating, _) => HandledState::Create3,
266            (SubscriptionState::Normal, TickReason::ReceivePublishRequest)
267                if self.publishing_enabled || !self.publishing_enabled && !p.more_notifications =>
268            {
269                HandledState::Normal4
270            }
271            (SubscriptionState::Normal, TickReason::ReceivePublishRequest)
272                if self.publishing_enabled && p.more_notifications =>
273            {
274                HandledState::Normal5
275            }
276            (SubscriptionState::Normal, TickReason::TickTimerFired)
277                if p.publishing_req_queued
278                    && self.publishing_enabled
279                    && p.notifications_available =>
280            {
281                HandledState::IntervalElapsed6
282            }
283            (SubscriptionState::Normal, TickReason::TickTimerFired)
284                if p.publishing_req_queued
285                    && !self.first_message_sent
286                    && (!self.publishing_enabled
287                        || self.publishing_enabled && !p.more_notifications) =>
288            {
289                HandledState::IntervalElapsed7
290            }
291            (SubscriptionState::Normal, TickReason::TickTimerFired)
292                if !p.publishing_req_queued
293                    && (!self.first_message_sent
294                        || self.publishing_enabled && p.notifications_available) =>
295            {
296                HandledState::IntervalElapsed8
297            }
298            (SubscriptionState::Normal, TickReason::TickTimerFired)
299                if self.first_message_sent
300                    && (!self.publishing_enabled
301                        || self.publishing_enabled && !p.more_notifications) =>
302            {
303                HandledState::IntervalElapsed9
304            }
305            (SubscriptionState::Late, TickReason::ReceivePublishRequest)
306                if self.publishing_enabled
307                    && (p.notifications_available || p.more_notifications) =>
308            {
309                HandledState::Late10
310            }
311            (SubscriptionState::Late, TickReason::ReceivePublishRequest)
312                if !self.publishing_enabled
313                    || self.publishing_enabled
314                        && !p.notifications_available
315                        && !p.more_notifications =>
316            {
317                HandledState::Late11
318            }
319            // This check is not in the spec, but without it the lifetime counter won't behave properly.
320            // This is probably an error in the standard.
321            (SubscriptionState::Late, TickReason::TickTimerFired) if self.lifetime_counter > 1 => {
322                HandledState::Late12
323            }
324            (SubscriptionState::KeepAlive, TickReason::ReceivePublishRequest) => {
325                HandledState::KeepAlive13
326            }
327            (SubscriptionState::KeepAlive, TickReason::TickTimerFired)
328                if self.publishing_enabled
329                    && p.notifications_available
330                    && p.publishing_req_queued =>
331            {
332                HandledState::KeepAlive14
333            }
334            (SubscriptionState::KeepAlive, TickReason::TickTimerFired)
335                if p.publishing_req_queued
336                    && self.keep_alive_counter == 1
337                    && (!self.publishing_enabled
338                        || self.publishing_enabled && !p.notifications_available) =>
339            {
340                HandledState::KeepAlive15
341            }
342            (SubscriptionState::KeepAlive, TickReason::TickTimerFired)
343                if self.keep_alive_counter > 1
344                    && (!self.publishing_enabled
345                        || self.publishing_enabled && !p.notifications_available) =>
346            {
347                HandledState::KeepAlive16
348            }
349            (SubscriptionState::KeepAlive, TickReason::TickTimerFired)
350                if !p.publishing_req_queued
351                    && (self.keep_alive_counter == 1
352                        || self.keep_alive_counter > 1
353                            && self.publishing_enabled
354                            && p.notifications_available) =>
355            {
356                HandledState::KeepAlive17
357            }
358            // Late is unreachable in the next state.
359            (
360                SubscriptionState::Normal | SubscriptionState::Late | SubscriptionState::KeepAlive,
361                TickReason::TickTimerFired,
362            ) if self.lifetime_counter <= 1 => HandledState::Closed27,
363            _ => HandledState::None0,
364        }
365    }
366
367    fn handle_state_transition(&mut self, transition: HandledState) -> UpdateStateAction {
368        match transition {
369            HandledState::None0 => UpdateStateAction::None,
370            HandledState::Create3 => {
371                self.state = SubscriptionState::Normal;
372                self.first_message_sent = false;
373                UpdateStateAction::SubscriptionCreated
374            }
375            HandledState::Normal4 => {
376                // Publish req queued at session level.
377                UpdateStateAction::None
378            }
379            HandledState::Normal5 => {
380                self.reset_lifetime_counter();
381                UpdateStateAction::ReturnNotifications
382            }
383            HandledState::IntervalElapsed6 => {
384                self.reset_lifetime_counter();
385                self.start_publishing_timer();
386                self.first_message_sent = true;
387                UpdateStateAction::ReturnNotifications
388            }
389            HandledState::IntervalElapsed7 => {
390                self.reset_lifetime_counter();
391                self.start_publishing_timer();
392                self.first_message_sent = true;
393                UpdateStateAction::ReturnKeepAlive
394            }
395            HandledState::IntervalElapsed8 => {
396                self.start_publishing_timer();
397                self.state = SubscriptionState::Late;
398                UpdateStateAction::None
399            }
400            HandledState::IntervalElapsed9 => {
401                self.start_publishing_timer();
402                self.reset_keep_alive_counter();
403                self.state = SubscriptionState::KeepAlive;
404                UpdateStateAction::None
405            }
406            HandledState::Late10 => {
407                self.reset_lifetime_counter();
408                self.first_message_sent = true;
409                self.state = SubscriptionState::Normal;
410                UpdateStateAction::ReturnNotifications
411            }
412            HandledState::Late11 => {
413                self.reset_lifetime_counter();
414                self.first_message_sent = true;
415                self.state = SubscriptionState::KeepAlive;
416                UpdateStateAction::ReturnKeepAlive
417            }
418            HandledState::Late12 => {
419                self.start_publishing_timer();
420                self.state = SubscriptionState::Late;
421                UpdateStateAction::None
422            }
423            HandledState::KeepAlive13 => {
424                // No-op, publish req enqueued at session level.
425                UpdateStateAction::None
426            }
427            HandledState::KeepAlive14 => {
428                self.reset_lifetime_counter();
429                self.start_publishing_timer();
430                self.first_message_sent = true;
431                self.state = SubscriptionState::Normal;
432                UpdateStateAction::ReturnNotifications
433            }
434            HandledState::KeepAlive15 => {
435                self.start_publishing_timer();
436                self.reset_keep_alive_counter();
437                UpdateStateAction::ReturnKeepAlive
438            }
439            HandledState::KeepAlive16 => {
440                self.start_publishing_timer();
441                self.keep_alive_counter -= 1;
442                UpdateStateAction::None
443            }
444            HandledState::KeepAlive17 => {
445                self.start_publishing_timer();
446                self.state = SubscriptionState::Late;
447                UpdateStateAction::None
448            }
449            HandledState::Closed27 => {
450                self.state = SubscriptionState::Closed;
451                UpdateStateAction::SubscriptionExpired
452            }
453        }
454    }
455
456    fn notifications_available(&self, resend_data: bool) -> bool {
457        if !self.notified_monitored_items.is_empty() {
458            true
459        } else if resend_data {
460            self.monitored_items.iter().any(|it| it.1.has_last_value())
461        } else {
462            false
463        }
464    }
465
466    pub(super) fn tick(
467        &mut self,
468        now: &DateTimeUtc,
469        now_instant: Instant,
470        tick_reason: TickReason,
471        publishing_req_queued: bool,
472    ) -> TickResult {
473        let publishing_interval_elapsed = match tick_reason {
474            TickReason::ReceivePublishRequest => false,
475            TickReason::TickTimerFired => {
476                if self.state == SubscriptionState::Creating {
477                    true
478                } else {
479                    self.test_and_set_publishing_interval_elapsed(now_instant)
480                }
481            }
482        };
483
484        // We're not actually doing anything in this case.
485        if matches!(tick_reason, TickReason::TickTimerFired) && !publishing_interval_elapsed {
486            return TickResult::None;
487        }
488        // First, get the actual state transition we're in.
489        let transition = self.get_state_transition(
490            tick_reason,
491            SubscriptionStateParams {
492                notifications_available: self.notifications_available(self.resend_data),
493                more_notifications: !self.notifications.is_empty(),
494                publishing_req_queued,
495            },
496        );
497        let action = self.handle_state_transition(transition);
498
499        match action {
500            UpdateStateAction::None => TickResult::None,
501            UpdateStateAction::ReturnKeepAlive => {
502                let notification = NotificationMessage::keep_alive(
503                    // OPC-UA part 4 5.13.1.1
504                    // "Each keep-alive Message is a response to a Publish request in which the notificationMessage parameter does not
505                    // contain any Notifications and that contains the sequence number of the next NotificationMessage that is to be sent."
506                    // Very vague, but the correct interpretation appears to be to not increment the sequence number.
507                    self.sequence_number.peek_next(),
508                    DateTime::from(*now),
509                );
510                self.enqueue_notification(notification);
511                TickResult::Enqueued
512            }
513            UpdateStateAction::ReturnNotifications => {
514                let resend_data = std::mem::take(&mut self.resend_data);
515                let messages = self.tick_monitored_items(now, resend_data);
516                for msg in messages {
517                    self.enqueue_notification(msg);
518                }
519                TickResult::Enqueued
520            }
521            UpdateStateAction::SubscriptionCreated => TickResult::None,
522            UpdateStateAction::SubscriptionExpired => {
523                debug!("Subscription status change to closed / timeout");
524                self.monitored_items.clear();
525                let notification = NotificationMessage::status_change(
526                    self.sequence_number.next(),
527                    DateTime::from(*now),
528                    StatusCode::BadTimeout,
529                );
530                self.enqueue_notification(notification);
531                TickResult::Expired
532            }
533        }
534    }
535
536    fn enqueue_notification(&mut self, notification: NotificationMessage) {
537        if self.notifications.len() >= self.max_queued_notifications {
538            warn!("Maximum number of queued notifications exceeded, dropping oldest. Subscription ID: {}", self.id);
539            self.notifications.pop_front();
540        }
541
542        // debug!("Enqueuing notification {:?}", notification);
543        self.notifications.push_back(notification);
544    }
545
546    pub(super) fn take_notification(&mut self) -> Option<NotificationMessage> {
547        self.notifications.pop_front()
548    }
549
550    pub(super) fn more_notifications(&self) -> bool {
551        !self.notifications.is_empty()
552    }
553
554    pub(super) fn ready_to_remove(&self) -> bool {
555        self.state == SubscriptionState::Closed && self.notifications.is_empty()
556    }
557
558    fn handle_triggers(
559        &mut self,
560        now: &DateTimeUtc,
561        triggers: Vec<(u32, u32)>,
562        notifications: &mut Vec<Notification>,
563        messages: &mut Vec<NotificationMessage>,
564    ) {
565        for (triggering_item, item_id) in triggers {
566            let Some(item) = self.monitored_items.get_mut(&item_id) else {
567                if let Some(item) = self.monitored_items.get_mut(&triggering_item) {
568                    item.remove_dead_trigger(item_id);
569                }
570                continue;
571            };
572
573            while let Some(notif) = item.pop_notification() {
574                notifications.push(notif);
575                if notifications.len() >= self.max_notifications_per_publish
576                    && self.max_notifications_per_publish > 0
577                {
578                    messages.push(Self::make_notification_message(
579                        self.sequence_number.next(),
580                        std::mem::take(notifications),
581                        now,
582                    ));
583                }
584            }
585        }
586    }
587
588    fn make_notification_message(
589        next_sequence_number: u32,
590        notifications: Vec<Notification>,
591        now: &DateTimeUtc,
592    ) -> NotificationMessage {
593        let mut data_change_notifications = Vec::new();
594        let mut event_notifications = Vec::new();
595
596        for notif in notifications {
597            match notif {
598                Notification::MonitoredItemNotification(n) => data_change_notifications.push(n),
599                Notification::Event(n) => event_notifications.push(n),
600            }
601        }
602
603        NotificationMessage::data_change(
604            next_sequence_number,
605            DateTime::from(*now),
606            data_change_notifications,
607            event_notifications,
608        )
609    }
610
611    #[allow(clippy::too_many_arguments)]
612    fn tick_monitored_item(
613        monitored_item: &mut MonitoredItem,
614        now: &DateTimeUtc,
615        resend_data: bool,
616        max_notifications: usize,
617        triggers: &mut Vec<(u32, u32)>,
618        notifications: &mut Vec<Notification>,
619        messages: &mut Vec<NotificationMessage>,
620        sequence_numbers: &mut Handle,
621    ) {
622        monitored_item.maybe_enqueue_skipped_value(&(*now).into());
623
624        if monitored_item.is_sampling() && monitored_item.has_new_notifications() {
625            triggers.extend(
626                monitored_item
627                    .triggered_items()
628                    .iter()
629                    .copied()
630                    .map(|id| (monitored_item.id(), id)),
631            );
632        }
633
634        if monitored_item.is_reporting() {
635            if resend_data {
636                monitored_item.add_current_value_to_queue();
637            }
638            if monitored_item.has_notifications() {
639                while let Some(notif) = monitored_item.pop_notification() {
640                    notifications.push(notif);
641                    if notifications.len() >= max_notifications && max_notifications > 0 {
642                        messages.push(Self::make_notification_message(
643                            sequence_numbers.next(),
644                            std::mem::take(notifications),
645                            now,
646                        ));
647                    }
648                }
649            }
650        }
651    }
652
653    fn tick_monitored_items(
654        &mut self,
655        now: &DateTimeUtc,
656        resend_data: bool,
657    ) -> Vec<NotificationMessage> {
658        let mut notifications = Vec::new();
659        let mut messages = Vec::new();
660        let mut triggers = Vec::new();
661
662        // If resend data is true, we must visit ever monitored item
663        if resend_data {
664            for monitored_item in self.monitored_items.values_mut() {
665                Self::tick_monitored_item(
666                    monitored_item,
667                    now,
668                    resend_data,
669                    self.max_notifications_per_publish,
670                    &mut triggers,
671                    &mut notifications,
672                    &mut messages,
673                    &mut self.sequence_number,
674                );
675            }
676        } else {
677            for item_id in self.notified_monitored_items.drain() {
678                let Some(monitored_item) = self.monitored_items.get_mut(&item_id) else {
679                    continue;
680                };
681                Self::tick_monitored_item(
682                    monitored_item,
683                    now,
684                    resend_data,
685                    self.max_notifications_per_publish,
686                    &mut triggers,
687                    &mut notifications,
688                    &mut messages,
689                    &mut self.sequence_number,
690                );
691            }
692        }
693
694        self.handle_triggers(now, triggers, &mut notifications, &mut messages);
695
696        if !notifications.is_empty() {
697            messages.push(Self::make_notification_message(
698                self.sequence_number.next(),
699                notifications,
700                now,
701            ));
702        }
703
704        messages
705    }
706
707    /// Reset the keep-alive counter to the maximum keep-alive count of the Subscription.
708    /// The maximum keep-alive count is set by the Client when the Subscription is created
709    /// and may be modified using the ModifySubscription Service
710    pub(super) fn reset_keep_alive_counter(&mut self) {
711        self.keep_alive_counter = self.max_keep_alive_counter;
712    }
713
714    /// Reset the lifetime counter to the value specified for the life time of the subscription
715    /// in the create subscription service
716    pub(super) fn reset_lifetime_counter(&mut self) {
717        self.lifetime_counter = self.max_lifetime_counter;
718    }
719
720    /// Start or restart the publishing timer and decrement the LifetimeCounter Variable.
721    pub(super) fn start_publishing_timer(&mut self) {
722        self.lifetime_counter = self.lifetime_counter.saturating_sub(1);
723        trace!("Decrementing life time counter {}", self.lifetime_counter);
724    }
725
726    /// The ID of this subscription.
727    pub fn id(&self) -> u32 {
728        self.id
729    }
730
731    /// The priority of this subscription.
732    pub fn priority(&self) -> u8 {
733        self.priority
734    }
735
736    pub(super) fn set_publishing_interval(&mut self, publishing_interval: Duration) {
737        self.publishing_interval = publishing_interval;
738        self.reset_lifetime_counter();
739    }
740
741    pub(super) fn set_max_lifetime_counter(&mut self, max_lifetime_counter: u32) {
742        self.max_lifetime_counter = max_lifetime_counter;
743    }
744
745    pub(super) fn set_max_keep_alive_counter(&mut self, max_keep_alive_counter: u32) {
746        self.max_keep_alive_counter = max_keep_alive_counter;
747    }
748
749    pub(super) fn set_priority(&mut self, priority: u8) {
750        self.priority = priority;
751    }
752
753    pub(super) fn set_max_notifications_per_publish(&mut self, max_notifications_per_publish: u64) {
754        self.max_notifications_per_publish = max_notifications_per_publish as usize;
755    }
756
757    pub(super) fn set_publishing_enabled(&mut self, publishing_enabled: bool) {
758        self.publishing_enabled = publishing_enabled;
759    }
760
761    /// The publishing interval of this subscription.
762    pub fn publishing_interval(&self) -> Duration {
763        self.publishing_interval
764    }
765
766    /// Whether publishing is enabled on this subscription.
767    pub fn publishing_enabled(&self) -> bool {
768        self.publishing_enabled
769    }
770
771    /// The maximum number of notification messages queued for this subscription.
772    pub fn max_queued_notifications(&self) -> usize {
773        self.max_queued_notifications
774    }
775
776    /// The maximum number of notifications per notification message for this
777    /// subscription.
778    pub fn max_notifications_per_publish(&self) -> usize {
779        self.max_notifications_per_publish
780    }
781
782    /// The current state of the subscription.
783    pub fn state(&self) -> SubscriptionState {
784        self.state
785    }
786}
787
788#[cfg(test)]
789mod tests {
790    use std::time::{Duration, Instant};
791
792    use chrono::{TimeDelta, Utc};
793
794    use crate::{
795        subscriptions::monitored_item::{
796            tests::new_monitored_item, FilterType, Notification, SamplingInterval,
797        },
798        SubscriptionState,
799    };
800    use opcua_types::{
801        match_extension_object_owned, AttributeId, DataChangeNotification, DataValue, DateTime,
802        DateTimeUtc, EventNotificationList, MonitoringMode, NodeId, NotificationMessage,
803        ReadValueId, StatusChangeNotification, StatusCode, Variant,
804    };
805
806    use super::{Subscription, TickReason};
807
808    fn get_notifications(message: &NotificationMessage) -> Vec<Notification> {
809        let mut res = Vec::new();
810        for it in message.notification_data.iter().flatten() {
811            let it = it.clone();
812            match_extension_object_owned!(it,
813                notif: DataChangeNotification => {
814                    for n in notif.monitored_items.into_iter().flatten() {
815                        res.push(Notification::MonitoredItemNotification(n));
816                    }
817                },
818                notif: EventNotificationList => {
819                    for n in notif.events.into_iter().flatten() {
820                        res.push(Notification::Event(n));
821                    }
822                },
823                _ => panic!("Wrong message type"),
824            )
825        }
826        res
827    }
828
829    fn offset(time: DateTimeUtc, time_inst: Instant, ms: u64) -> (DateTimeUtc, Instant) {
830        (
831            time + chrono::Duration::try_milliseconds(ms as i64).unwrap(),
832            time_inst + Duration::from_millis(ms),
833        )
834    }
835
836    #[test]
837    fn tick() {
838        let mut sub = Subscription::new(1, true, Duration::from_millis(100), 100, 20, 1, 100, 1000);
839        let start = Instant::now();
840        let start_dt = Utc::now();
841
842        sub.last_time_publishing_interval_elapsed = start;
843
844        // Subscription is creating, handle the first tick.
845        assert_eq!(sub.state, SubscriptionState::Creating);
846        sub.tick(&start_dt, start, TickReason::TickTimerFired, true);
847        assert_eq!(sub.state, SubscriptionState::Normal);
848        assert!(!sub.first_message_sent);
849
850        // Tick again before the publishing interval has elapsed, should change nothing.
851        sub.tick(&start_dt, start, TickReason::TickTimerFired, true);
852        assert_eq!(sub.state, SubscriptionState::Normal);
853        assert!(!sub.first_message_sent);
854
855        // Add a monitored item
856        sub.insert(
857            1,
858            new_monitored_item(
859                1,
860                ReadValueId {
861                    node_id: NodeId::null(),
862                    attribute_id: AttributeId::Value as u32,
863                    ..Default::default()
864                },
865                MonitoringMode::Reporting,
866                FilterType::None,
867                SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
868                false,
869                Some(DataValue::new_now(123)),
870            ),
871        );
872        // New tick at next publishing interval should produce something
873        let (time, time_inst) = offset(start_dt, start, 100);
874        sub.tick(&time, time_inst, TickReason::TickTimerFired, true);
875        assert_eq!(sub.state, SubscriptionState::Normal);
876        assert!(sub.first_message_sent);
877        let notif = sub.take_notification().unwrap();
878        let its = get_notifications(&notif);
879        assert_eq!(its.len(), 1);
880        assert_eq!(notif.sequence_number, 1);
881        let Notification::MonitoredItemNotification(m) = &its[0] else {
882            panic!("Wrong notification type");
883        };
884        assert_eq!(m.value.value, Some(Variant::Int32(123)));
885
886        // Next tick produces nothing
887        let (time, time_inst) = offset(start_dt, start, 200);
888
889        sub.tick(&time, time_inst, TickReason::TickTimerFired, true);
890        // State transitions to keep alive due to empty publish.
891        assert_eq!(sub.state, SubscriptionState::KeepAlive);
892        assert_eq!(sub.lifetime_counter, 98);
893        assert!(sub.first_message_sent);
894        assert!(sub.take_notification().is_none());
895
896        // Enqueue a new notification
897        sub.notify_data_value(
898            &1,
899            DataValue::new_at(
900                321,
901                DateTime::from(start_dt + chrono::Duration::try_milliseconds(300).unwrap()),
902            ),
903            &DateTime::now(),
904        );
905        let (time, time_inst) = offset(start_dt, start, 300);
906        sub.tick(&time, time_inst, TickReason::TickTimerFired, true);
907        // State transitions back to normal.
908        assert_eq!(sub.state, SubscriptionState::Normal);
909        assert!(sub.first_message_sent);
910        assert_eq!(sub.lifetime_counter, 99);
911        let notif = sub.take_notification().unwrap();
912        let its = get_notifications(&notif);
913        assert_eq!(notif.sequence_number, 2);
914        assert_eq!(its.len(), 1);
915        let Notification::MonitoredItemNotification(m) = &its[0] else {
916            panic!("Wrong notification type");
917        };
918        assert_eq!(m.value.value, Some(Variant::Int32(321)));
919
920        for i in 0..20 {
921            let (time, time_inst) = offset(start_dt, start, 1000 + i * 100);
922            sub.tick(&time, time_inst, TickReason::TickTimerFired, true);
923            assert_eq!(sub.state, SubscriptionState::KeepAlive);
924            assert_eq!(sub.lifetime_counter, (99 - i - 1) as u32);
925            assert_eq!(sub.keep_alive_counter, (20 - i) as u32);
926            assert!(sub.take_notification().is_none());
927        }
928        assert_eq!(sub.lifetime_counter, 79);
929        assert_eq!(sub.keep_alive_counter, 1);
930
931        // Tick one more time to get a keep alive
932        let (time, time_inst) = offset(start_dt, start, 3000);
933        sub.tick(&time, time_inst, TickReason::TickTimerFired, true);
934        assert_eq!(sub.state, SubscriptionState::KeepAlive);
935        assert_eq!(sub.lifetime_counter, 78);
936        assert_eq!(sub.keep_alive_counter, 20);
937        let notif = sub.take_notification().unwrap();
938        let its = get_notifications(&notif);
939        assert!(its.is_empty());
940        // The next sequence number...
941        assert_eq!(notif.sequence_number, 3);
942
943        // Tick another 20 times to become late
944        for i in 0..19 {
945            let (time, time_inst) = offset(start_dt, start, 3100 + i * 100);
946            sub.tick(&time, time_inst, TickReason::TickTimerFired, false);
947            assert_eq!(sub.state, SubscriptionState::KeepAlive);
948            assert_eq!(sub.lifetime_counter, (78 - i - 1) as u32);
949        }
950
951        // Tick another 58 times to expire
952        for i in 0..58 {
953            let (time, time_inst) = offset(start_dt, start, 5100 + i * 100);
954            sub.tick(&time, time_inst, TickReason::TickTimerFired, false);
955            assert_eq!(sub.state, SubscriptionState::Late);
956            assert_eq!(sub.lifetime_counter, (58 - i) as u32);
957        }
958        assert_eq!(sub.lifetime_counter, 1);
959
960        let (time, time_inst) = offset(start_dt, start, 20000);
961        sub.tick(&time, time_inst, TickReason::TickTimerFired, false);
962        assert_eq!(sub.state, SubscriptionState::Closed);
963        let notif = sub.take_notification().unwrap();
964        assert_eq!(notif.sequence_number, 3);
965        assert_eq!(1, notif.notification_data.as_ref().unwrap().len());
966        let status_change = notif.notification_data.as_ref().unwrap()[0]
967            .inner_as::<StatusChangeNotification>()
968            .unwrap();
969        assert_eq!(status_change.status, StatusCode::BadTimeout);
970    }
971
972    #[test]
973    fn monitored_item_triggers() {
974        let mut sub = Subscription::new(1, true, Duration::from_millis(100), 100, 20, 1, 100, 1000);
975        let start = Instant::now();
976        let start_dt = Utc::now();
977
978        sub.last_time_publishing_interval_elapsed = start;
979        for i in 0..4 {
980            sub.insert(
981                i + 1,
982                new_monitored_item(
983                    i + 1,
984                    ReadValueId {
985                        node_id: NodeId::null(),
986                        attribute_id: AttributeId::Value as u32,
987                        ..Default::default()
988                    },
989                    if i == 0 {
990                        MonitoringMode::Reporting
991                    } else if i == 3 {
992                        MonitoringMode::Disabled
993                    } else {
994                        MonitoringMode::Sampling
995                    },
996                    FilterType::None,
997                    SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
998                    false,
999                    Some(DataValue::new_at(0, start_dt.into())),
1000                ),
1001            );
1002        }
1003        sub.get_mut(&1).unwrap().set_triggering(&[1, 2, 3, 4], &[]);
1004        // Notify the two sampling items and the disabled item
1005        let (otime, time_inst) = offset(start_dt, start, 100);
1006        let time = otime.into();
1007        sub.notify_data_value(&2, DataValue::new_at(1, time), &time);
1008        sub.notify_data_value(&3, DataValue::new_at(1, time), &time);
1009        sub.notify_data_value(&4, DataValue::new_at(1, time), &time);
1010
1011        // Should not cause a notification
1012        sub.tick(&otime, time_inst, TickReason::TickTimerFired, true);
1013        assert!(sub.take_notification().is_none());
1014
1015        // Notify the first item
1016        sub.notify_data_value(&1, DataValue::new_at(1, time.into()), &time);
1017        let (time, time_inst) = offset(start_dt, start, 200);
1018        sub.tick(&time, time_inst, TickReason::TickTimerFired, true);
1019        let notif = sub.take_notification().unwrap();
1020        let its = get_notifications(&notif);
1021        assert_eq!(its.len(), 6);
1022        for it in its {
1023            let Notification::MonitoredItemNotification(_m) = it else {
1024                panic!("Wrong notification type");
1025            };
1026        }
1027    }
1028}