Skip to main content

opcua/server/subscriptions/
subscription.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4
5use std::collections::{BTreeSet, HashMap, VecDeque};
6use std::sync::Arc;
7
8use crate::sync::*;
9use crate::types::{
10    service_types::{
11        MonitoredItemCreateRequest, MonitoredItemCreateResult, MonitoredItemModifyRequest,
12        MonitoredItemModifyResult, NotificationMessage, TimestampsToReturn,
13    },
14    status_code::StatusCode,
15    *,
16};
17
18use crate::core::handle::Handle;
19
20use crate::server::{
21    address_space::AddressSpace,
22    constants,
23    diagnostics::ServerDiagnostics,
24    state::ServerState,
25    subscriptions::monitored_item::{MonitoredItem, Notification, TickResult},
26};
27
28/// The state of the subscription
29#[derive(Debug, Copy, Clone, PartialEq, Serialize)]
30pub(crate) enum SubscriptionState {
31    Closed,
32    Creating,
33    Normal,
34    Late,
35    KeepAlive,
36}
37
38#[derive(Debug)]
39pub(crate) struct SubscriptionStateParams {
40    pub notifications_available: bool,
41    pub more_notifications: bool,
42    pub publishing_req_queued: bool,
43    pub publishing_timer_expired: bool,
44}
45
46#[derive(Debug, Copy, Clone, PartialEq)]
47pub enum UpdateStateAction {
48    None,
49    // Return a keep alive
50    ReturnKeepAlive,
51    // Return notifications
52    ReturnNotifications,
53    // The subscription was created normally
54    SubscriptionCreated,
55    // The subscription has expired and must be closed
56    SubscriptionExpired,
57}
58
59/// This is for debugging purposes. It allows the caller to validate the output state if required.
60///
61/// Values correspond to state table in OPC UA Part 4 5.13.1.2
62///
63#[derive(Debug, Copy, Clone, PartialEq)]
64pub(crate) enum HandledState {
65    None0 = 0,
66    Create3 = 3,
67    Normal4 = 4,
68    Normal5 = 5,
69    IntervalElapsed6 = 6,
70    IntervalElapsed7 = 7,
71    IntervalElapsed8 = 8,
72    IntervalElapsed9 = 9,
73    Late10 = 10,
74    Late11 = 11,
75    Late12 = 12,
76    KeepAlive13 = 13,
77    KeepAlive14 = 14,
78    KeepAlive15 = 15,
79    KeepAlive16 = 16,
80    KeepAlive17 = 17,
81    Closed27 = 27,
82}
83
84/// This is for debugging purposes. It allows the caller to validate the output state if required.
85#[derive(Debug)]
86pub(crate) struct UpdateStateResult {
87    pub handled_state: HandledState,
88    pub update_state_action: UpdateStateAction,
89}
90
91impl UpdateStateResult {
92    pub fn new(
93        handled_state: HandledState,
94        update_state_action: UpdateStateAction,
95    ) -> UpdateStateResult {
96        UpdateStateResult {
97            handled_state,
98            update_state_action,
99        }
100    }
101}
102
103#[derive(Debug, Copy, Clone, PartialEq)]
104pub(crate) enum TickReason {
105    ReceivePublishRequest,
106    TickTimerFired,
107}
108
109#[derive(Debug, Clone, Serialize)]
110pub struct Subscription {
111    /// Subscription id
112    subscription_id: u32,
113    /// Publishing interval in milliseconds
114    publishing_interval: Duration,
115    /// The lifetime count reset value
116    max_lifetime_counter: u32,
117    /// Keep alive count reset value
118    max_keep_alive_counter: u32,
119    /// Relative priority of the subscription. When more than one subscriptio
120    ///  needs to send notifications the highest priority subscription should
121    /// be sent first.
122    priority: u8,
123    /// Map of monitored items
124    monitored_items: HashMap<u32, MonitoredItem>,
125    /// State of the subscription
126    state: SubscriptionState,
127    /// A value that contains the number of consecutive publishing timer expirations without Client
128    /// activity before the Subscription is terminated.
129    lifetime_counter: u32,
130    /// Keep alive counter decrements when there are no notifications to publish and when it expires
131    /// requests to send an empty notification as a keep alive event
132    keep_alive_counter: u32,
133    /// boolean value that is set to true to mean that either a NotificationMessage or a keep-alive
134    /// Message has been sent on the Subscription. It is a flag that is used to ensure that either
135    /// a NotificationMessage or a keep-alive Message is sent out the first time the publishing timer
136    /// expires.
137    first_message_sent: bool,
138    /// The parameter that requests publishing to be enabled or disabled.
139    publishing_enabled: bool,
140    /// A flag that tells the subscription to send the latest value of every monitored item on the
141    /// next publish request.
142    resend_data: bool,
143    /// The next sequence number to be sent
144    sequence_number: Handle,
145    /// Last notification's sequence number. This is a sanity check since sequence numbers should start from
146    /// 1 and be sequential - it that doesn't happen the server will panic because something went
147    /// wrong somewhere.
148    last_sequence_number: u32,
149    // The last monitored item id
150    next_monitored_item_id: u32,
151    // The time that the subscription interval last fired
152    last_time_publishing_interval_elapsed: DateTimeUtc,
153    // Currently outstanding notifications to send
154    #[serde(skip)]
155    notifications: VecDeque<NotificationMessage>,
156    /// Server diagnostics to track creation / destruction / modification of the subscription
157    #[serde(skip)]
158    diagnostics: Arc<RwLock<ServerDiagnostics>>,
159    /// Stops the subscription calling diagnostics on drop
160    #[serde(skip)]
161    diagnostics_on_drop: bool,
162}
163
164impl Drop for Subscription {
165    fn drop(&mut self) {
166        if self.diagnostics_on_drop {
167            let mut diagnostics = trace_write_lock!(self.diagnostics);
168            diagnostics.on_destroy_subscription(self);
169        }
170    }
171}
172
173impl Subscription {
174    pub fn new(
175        diagnostics: Arc<RwLock<ServerDiagnostics>>,
176        subscription_id: u32,
177        publishing_enabled: bool,
178        publishing_interval: Duration,
179        lifetime_counter: u32,
180        keep_alive_counter: u32,
181        priority: u8,
182    ) -> Subscription {
183        let subscription = Subscription {
184            subscription_id,
185            publishing_interval,
186            priority,
187            monitored_items: HashMap::with_capacity(constants::DEFAULT_MONITORED_ITEM_CAPACITY),
188            max_lifetime_counter: lifetime_counter,
189            max_keep_alive_counter: keep_alive_counter,
190            // State variables
191            state: SubscriptionState::Creating,
192            lifetime_counter,
193            keep_alive_counter,
194            first_message_sent: false,
195            publishing_enabled,
196            resend_data: false,
197            // Counters for new items
198            sequence_number: Handle::new(1),
199            last_sequence_number: 0,
200            next_monitored_item_id: 1,
201            last_time_publishing_interval_elapsed: chrono::Utc::now(),
202            notifications: VecDeque::with_capacity(100),
203            diagnostics,
204            diagnostics_on_drop: true,
205        };
206        {
207            let mut diagnostics = trace_write_lock!(subscription.diagnostics);
208            diagnostics.on_create_subscription(&subscription);
209        }
210        subscription
211    }
212
213    pub(crate) fn ready_to_remove(&self) -> bool {
214        self.state == SubscriptionState::Closed && self.notifications.is_empty()
215    }
216
217    /// Creates a MonitoredItemCreateResult containing an error code
218    fn monitored_item_create_error(status_code: StatusCode) -> MonitoredItemCreateResult {
219        MonitoredItemCreateResult {
220            status_code,
221            monitored_item_id: 0,
222            revised_sampling_interval: 0f64,
223            revised_queue_size: 0,
224            filter_result: ExtensionObject::null(),
225        }
226    }
227
228    pub fn monitored_items_len(&self) -> usize {
229        self.monitored_items.len()
230    }
231
232    /// Creates monitored items on the specified subscription, returning the creation results
233    pub fn create_monitored_items(
234        &mut self,
235        server_state: &ServerState,
236        address_space: &AddressSpace,
237        now: &DateTimeUtc,
238        timestamps_to_return: TimestampsToReturn,
239        items_to_create: &[MonitoredItemCreateRequest],
240    ) -> Vec<MonitoredItemCreateResult> {
241        self.reset_lifetime_counter();
242
243        // Add items to the subscription if they're not already in its
244        items_to_create
245            .iter()
246            .map(|item_to_create| {
247                if !address_space.node_exists(&item_to_create.item_to_monitor.node_id) {
248                    Self::monitored_item_create_error(StatusCode::BadNodeIdUnknown)
249                } else {
250                    // TODO validate the attribute id for the type of node
251                    // TODO validate the index range for the node
252
253                    // Create a monitored item, if possible
254                    let monitored_item_id = self.next_monitored_item_id;
255                    match MonitoredItem::new(
256                        now,
257                        monitored_item_id,
258                        timestamps_to_return,
259                        server_state,
260                        item_to_create,
261                    ) {
262                        Ok(monitored_item) => {
263                            if server_state.max_monitored_items_per_sub == 0
264                                || self.monitored_items.len()
265                                    <= server_state.max_monitored_items_per_sub
266                            {
267                                let revised_sampling_interval = monitored_item.sampling_interval();
268                                let revised_queue_size = monitored_item.queue_size() as u32;
269                                // Validate the filter before registering the item
270                                match monitored_item.validate_filter(address_space) {
271                                    Ok(filter_result) => {
272                                        // Register the item with the subscription
273                                        self.monitored_items
274                                            .insert(monitored_item_id, monitored_item);
275                                        self.next_monitored_item_id += 1;
276                                        MonitoredItemCreateResult {
277                                            status_code: StatusCode::Good,
278                                            monitored_item_id,
279                                            revised_sampling_interval,
280                                            revised_queue_size,
281                                            filter_result,
282                                        }
283                                    }
284                                    Err(status_code) => {
285                                        Self::monitored_item_create_error(status_code)
286                                    }
287                                }
288                            } else {
289                                // Number of monitored items exceeds limit per sub
290                                Self::monitored_item_create_error(
291                                    StatusCode::BadTooManyMonitoredItems,
292                                )
293                            }
294                        }
295                        Err(status_code) => Self::monitored_item_create_error(status_code),
296                    }
297                }
298            })
299            .collect()
300    }
301
302    /// Modify the specified monitored items, returning a result for each
303    pub fn modify_monitored_items(
304        &mut self,
305        server_state: &ServerState,
306        address_space: &AddressSpace,
307        timestamps_to_return: TimestampsToReturn,
308        items_to_modify: &[MonitoredItemModifyRequest],
309    ) -> Vec<MonitoredItemModifyResult> {
310        self.reset_lifetime_counter();
311        items_to_modify
312            .iter()
313            .map(|item_to_modify| {
314                match self
315                    .monitored_items
316                    .get_mut(&item_to_modify.monitored_item_id)
317                {
318                    Some(monitored_item) => {
319                        // Try to change the monitored item according to the modify request
320                        let modify_result = monitored_item.modify(
321                            server_state,
322                            address_space,
323                            timestamps_to_return,
324                            item_to_modify,
325                        );
326                        match modify_result {
327                            Ok(filter_result) => MonitoredItemModifyResult {
328                                status_code: StatusCode::Good,
329                                revised_sampling_interval: monitored_item.sampling_interval(),
330                                revised_queue_size: monitored_item.queue_size() as u32,
331                                filter_result,
332                            },
333                            Err(err) => MonitoredItemModifyResult {
334                                status_code: err,
335                                revised_sampling_interval: 0f64,
336                                revised_queue_size: 0,
337                                filter_result: ExtensionObject::null(),
338                            },
339                        }
340                    }
341                    // Item does not exist
342                    None => MonitoredItemModifyResult {
343                        status_code: StatusCode::BadMonitoredItemIdInvalid,
344                        revised_sampling_interval: 0f64,
345                        revised_queue_size: 0,
346                        filter_result: ExtensionObject::null(),
347                    },
348                }
349            })
350            .collect()
351    }
352
353    /// Sets the monitoring mode on one monitored item
354    pub fn set_monitoring_mode(
355        &mut self,
356        monitored_item_id: u32,
357        monitoring_mode: MonitoringMode,
358    ) -> StatusCode {
359        if let Some(monitored_item) = self.monitored_items.get_mut(&monitored_item_id) {
360            monitored_item.set_monitoring_mode(monitoring_mode);
361            StatusCode::Good
362        } else {
363            StatusCode::BadMonitoredItemIdInvalid
364        }
365    }
366
367    /// Delete the specified monitored items (by item id), returning a status code for each
368    pub fn delete_monitored_items(&mut self, items_to_delete: &[u32]) -> Vec<StatusCode> {
369        self.reset_lifetime_counter();
370        items_to_delete
371            .iter()
372            .map(
373                |item_to_delete| match self.monitored_items.remove(item_to_delete) {
374                    Some(_) => StatusCode::Good,
375                    None => StatusCode::BadMonitoredItemIdInvalid,
376                },
377            )
378            .collect()
379    }
380
381    // Returns two vecs representing the server and client handles for each monitored item.
382    // Called from the GetMonitoredItems impl
383    pub fn get_handles(&self) -> (Vec<u32>, Vec<u32>) {
384        let server_handles = self
385            .monitored_items
386            .values()
387            .map(|i| i.monitored_item_id())
388            .collect();
389        let client_handles = self
390            .monitored_items
391            .values()
392            .map(|i| i.client_handle())
393            .collect();
394        (server_handles, client_handles)
395    }
396
397    /// Sets the resend data flag which means the next publish request will receive the latest value
398    /// of every monitored item whether it has changed in this cycle or not.
399    pub fn set_resend_data(&mut self) {
400        self.resend_data = true;
401    }
402
403    /// Tests if the publishing interval has elapsed since the last time this function in which case
404    /// it returns `true` and updates its internal state.
405    fn test_and_set_publishing_interval_elapsed(&mut self, now: &DateTimeUtc) -> bool {
406        // Look at the last expiration time compared to now and see if it matches
407        // or exceeds the publishing interval
408        let publishing_interval = super::duration_from_ms(self.publishing_interval);
409        // TODO unwrap logic needs to change
410        let elapsed = now
411            .signed_duration_since(self.last_time_publishing_interval_elapsed)
412            .to_std()
413            .unwrap();
414        if elapsed >= publishing_interval {
415            self.last_time_publishing_interval_elapsed = *now;
416            true
417        } else {
418            false
419        }
420    }
421
422    /// Checks the subscription and monitored items for state change, messages. Returns `true`
423    /// if there are zero or more notifications waiting to be processed.
424    pub(crate) fn tick(
425        &mut self,
426        now: &DateTimeUtc,
427        address_space: &AddressSpace,
428        tick_reason: TickReason,
429        publishing_req_queued: bool,
430    ) {
431        // Check if the publishing interval has elapsed. Only checks on the tick timer.
432        let publishing_interval_elapsed = match tick_reason {
433            TickReason::ReceivePublishRequest => false,
434            TickReason::TickTimerFired => {
435                if self.state == SubscriptionState::Creating {
436                    true
437                } else if self.publishing_interval <= 0f64 {
438                    panic!("Publishing interval should have been revised to min interval")
439                } else {
440                    self.test_and_set_publishing_interval_elapsed(now)
441                }
442            }
443        };
444
445        // Do a tick on monitored items. Note that monitored items normally update when the interval
446        // elapses but they don't have to. So this is called every tick just to catch items with their
447        // own intervals.
448
449        let notification = match self.state {
450            SubscriptionState::Closed | SubscriptionState::Creating => None,
451            _ => {
452                let resend_data = self.resend_data;
453                self.tick_monitored_items(
454                    now,
455                    address_space,
456                    publishing_interval_elapsed,
457                    resend_data,
458                )
459            }
460        };
461        self.resend_data = false;
462
463        let notifications_available = !self.notifications.is_empty() || notification.is_some();
464        let more_notifications = self.notifications.len() > 1;
465
466        // If items have changed or subscription interval elapsed then we may have notifications
467        // to send or state to update
468        if notifications_available || publishing_interval_elapsed || publishing_req_queued {
469            // Update the internal state of the subscription based on what happened
470            let update_state_result = self.update_state(
471                tick_reason,
472                SubscriptionStateParams {
473                    publishing_req_queued,
474                    notifications_available,
475                    more_notifications,
476                    publishing_timer_expired: publishing_interval_elapsed,
477                },
478            );
479            trace!(
480                "subscription tick - update_state_result = {:?}",
481                update_state_result
482            );
483            self.handle_state_result(now, update_state_result, notification);
484        }
485    }
486
487    fn enqueue_notification(&mut self, notification: NotificationMessage) {
488        use std::u32;
489        // For sanity, check the sequence number is the expected sequence number.
490        let expected_sequence_number = if self.last_sequence_number == u32::MAX {
491            1
492        } else {
493            self.last_sequence_number + 1
494        };
495        if notification.sequence_number != expected_sequence_number {
496            panic!(
497                "Notification's sequence number is not sequential, expecting {}, got {}",
498                expected_sequence_number, notification.sequence_number
499            );
500        }
501        // debug!("Enqueuing notification {:?}", notification);
502        self.last_sequence_number = notification.sequence_number;
503        self.notifications.push_back(notification);
504    }
505
506    fn handle_state_result(
507        &mut self,
508        now: &DateTimeUtc,
509        update_state_result: UpdateStateResult,
510        notification: Option<NotificationMessage>,
511    ) {
512        // Now act on the state's action
513        match update_state_result.update_state_action {
514            UpdateStateAction::None => {
515                if let Some(ref notification) = notification {
516                    // Reset the next sequence number to the discarded notification
517                    let notification_sequence_number = notification.sequence_number;
518                    self.sequence_number.set_next(notification_sequence_number);
519                    debug!("Notification message nr {} was being ignored for a do-nothing, update state was {:?}", notification_sequence_number, update_state_result);
520                }
521                // Send nothing
522            }
523            UpdateStateAction::ReturnKeepAlive => {
524                if let Some(ref notification) = notification {
525                    // Reset the next sequence number to the discarded notification
526                    let notification_sequence_number = notification.sequence_number;
527                    self.sequence_number.set_next(notification_sequence_number);
528                    debug!("Notification message nr {} was being ignored for a keep alive, update state was {:?}", notification_sequence_number, update_state_result);
529                }
530                // Send a keep alive
531                debug!("Sending keep alive response");
532                let notification = NotificationMessage::keep_alive(
533                    self.sequence_number.next(),
534                    DateTime::from(*now),
535                );
536                self.enqueue_notification(notification);
537            }
538            UpdateStateAction::ReturnNotifications => {
539                // Add the notification message to the queue
540                if let Some(notification) = notification {
541                    self.enqueue_notification(notification);
542                }
543            }
544            UpdateStateAction::SubscriptionCreated => {
545                if notification.is_some() {
546                    panic!("SubscriptionCreated got a notification");
547                }
548                // Subscription was created successfully
549                //                let notification = NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::Good);
550                //                self.enqueue_notification(notification);
551            }
552            UpdateStateAction::SubscriptionExpired => {
553                if notification.is_some() {
554                    panic!("SubscriptionExpired got a notification");
555                }
556                // Delete the monitored items, issue a status change for the subscription
557                debug!("Subscription status change to closed / timeout");
558                self.monitored_items.clear();
559                let notification = NotificationMessage::status_change(
560                    self.sequence_number.next(),
561                    DateTime::from(*now),
562                    StatusCode::BadTimeout,
563                );
564                self.enqueue_notification(notification);
565            }
566        }
567    }
568
569    pub(crate) fn take_notification(&mut self) -> Option<NotificationMessage> {
570        self.notifications.pop_front()
571    }
572
573    // See OPC UA Part 4 5.13.1.2 State Table
574    //
575    // This function implements the main guts of updating the subscription's state according to
576    // some input events and its existing internal state.
577    //
578    // Calls to the function will update the internal state of and return a tuple with any required
579    // actions.
580    //
581    // Note that some state events are handled outside of update_state. e.g. the subscription
582    // is created elsewhere which handles states 1, 2 and 3.
583    //
584    // Inputs:
585    //
586    // * publish_request - an optional publish request. May be used by subscription to remove acknowledged notifications
587    // * publishing_interval_elapsed - true if the publishing interval has elapsed
588    //
589    // Returns in order:
590    //
591    // * State id that handled this call. Useful for debugging which state handler triggered
592    // * Update state action - none, return notifications, return keep alive
593    // * Publishing request action - nothing, dequeue
594    //
595    pub(crate) fn update_state(
596        &mut self,
597        tick_reason: TickReason,
598        p: SubscriptionStateParams,
599    ) -> UpdateStateResult {
600        // This function is called when a publish request is received OR the timer expired, so getting
601        // both is invalid code somewhere
602        if tick_reason == TickReason::ReceivePublishRequest && p.publishing_timer_expired {
603            panic!("Should not be possible for timer to have expired and received publish request at same time")
604        }
605
606        // Extra state debugging
607        {
608            use log::Level::Trace;
609            if log_enabled!(Trace) {
610                trace!(
611                    r#"State inputs:
612    subscription_id: {} / state: {:?}
613    tick_reason: {:?} / state_params: {:?}
614    publishing_enabled: {}
615    keep_alive_counter / lifetime_counter: {} / {}
616    message_sent: {}"#,
617                    self.subscription_id,
618                    self.state,
619                    tick_reason,
620                    p,
621                    self.publishing_enabled,
622                    self.keep_alive_counter,
623                    self.lifetime_counter,
624                    self.first_message_sent
625                );
626            }
627        }
628
629        // This is a state engine derived from OPC UA Part 4 Publish service and might look a
630        // little odd for that.
631        //
632        // Note in some cases, some of the actions have already happened outside of this function.
633        // For example, publish requests are already queued before we come in here and this function
634        // uses what its given. Likewise, this function does not "send" notifications, rather
635        // it returns them (if any) and it is up to the caller to send them
636
637        // more state tests that match on more than one state
638        match self.state {
639            SubscriptionState::Normal | SubscriptionState::Late | SubscriptionState::KeepAlive => {
640                if self.lifetime_counter == 1 {
641                    // State #27
642                    self.state = SubscriptionState::Closed;
643                    return UpdateStateResult::new(
644                        HandledState::Closed27,
645                        UpdateStateAction::SubscriptionExpired,
646                    );
647                }
648            }
649            _ => {
650                // DO NOTHING
651            }
652        }
653
654        match self.state {
655            SubscriptionState::Creating => {
656                // State #2
657                // CreateSubscription fails, return negative response
658                // Handled in message handler
659                // State #3
660                self.state = SubscriptionState::Normal;
661                self.first_message_sent = false;
662                return UpdateStateResult::new(
663                    HandledState::Create3,
664                    UpdateStateAction::SubscriptionCreated,
665                );
666            }
667            SubscriptionState::Normal => {
668                if tick_reason == TickReason::ReceivePublishRequest
669                    && (!self.publishing_enabled
670                        || (self.publishing_enabled && !p.more_notifications))
671                {
672                    // State #4
673                    return UpdateStateResult::new(HandledState::Normal4, UpdateStateAction::None);
674                } else if tick_reason == TickReason::ReceivePublishRequest
675                    && self.publishing_enabled
676                    && p.more_notifications
677                {
678                    // State #5
679                    self.reset_lifetime_counter();
680                    self.first_message_sent = true;
681                    return UpdateStateResult::new(
682                        HandledState::Normal5,
683                        UpdateStateAction::ReturnNotifications,
684                    );
685                } else if p.publishing_timer_expired
686                    && p.publishing_req_queued
687                    && self.publishing_enabled
688                    && p.notifications_available
689                {
690                    // State #6
691                    self.reset_lifetime_counter();
692                    self.start_publishing_timer();
693                    self.first_message_sent = true;
694                    return UpdateStateResult::new(
695                        HandledState::IntervalElapsed6,
696                        UpdateStateAction::ReturnNotifications,
697                    );
698                } else if p.publishing_timer_expired
699                    && p.publishing_req_queued
700                    && !self.first_message_sent
701                    && (!self.publishing_enabled
702                        || (self.publishing_enabled && !p.notifications_available))
703                {
704                    // State #7
705                    self.reset_lifetime_counter();
706                    self.start_publishing_timer();
707                    self.first_message_sent = true;
708                    return UpdateStateResult::new(
709                        HandledState::IntervalElapsed7,
710                        UpdateStateAction::ReturnKeepAlive,
711                    );
712                } else if p.publishing_timer_expired
713                    && !p.publishing_req_queued
714                    && (!self.first_message_sent
715                        || (self.publishing_enabled && p.notifications_available))
716                {
717                    // State #8
718                    self.start_publishing_timer();
719                    self.state = SubscriptionState::Late;
720                    return UpdateStateResult::new(
721                        HandledState::IntervalElapsed8,
722                        UpdateStateAction::None,
723                    );
724                } else if p.publishing_timer_expired
725                    && self.first_message_sent
726                    && (!self.publishing_enabled
727                        || (self.publishing_enabled && !p.notifications_available))
728                {
729                    // State #9
730                    self.start_publishing_timer();
731                    self.reset_keep_alive_counter();
732                    self.state = SubscriptionState::KeepAlive;
733                    return UpdateStateResult::new(
734                        HandledState::IntervalElapsed9,
735                        UpdateStateAction::None,
736                    );
737                }
738            }
739            SubscriptionState::Late => {
740                if tick_reason == TickReason::ReceivePublishRequest
741                    && self.publishing_enabled
742                    && (p.notifications_available || p.more_notifications)
743                {
744                    // State #10
745                    self.reset_lifetime_counter();
746                    self.state = SubscriptionState::Normal;
747                    self.first_message_sent = true;
748                    return UpdateStateResult::new(
749                        HandledState::Late10,
750                        UpdateStateAction::ReturnNotifications,
751                    );
752                } else if tick_reason == TickReason::ReceivePublishRequest
753                    && (!self.publishing_enabled
754                        || (self.publishing_enabled
755                            && !p.notifications_available
756                            && !p.more_notifications))
757                {
758                    // State #11
759                    self.reset_lifetime_counter();
760                    self.state = SubscriptionState::KeepAlive;
761                    self.first_message_sent = true;
762                    return UpdateStateResult::new(
763                        HandledState::Late11,
764                        UpdateStateAction::ReturnKeepAlive,
765                    );
766                } else if p.publishing_timer_expired {
767                    // State #12
768                    self.start_publishing_timer();
769                    return UpdateStateResult::new(HandledState::Late12, UpdateStateAction::None);
770                }
771            }
772            SubscriptionState::KeepAlive => {
773                if tick_reason == TickReason::ReceivePublishRequest {
774                    // State #13
775                    return UpdateStateResult::new(
776                        HandledState::KeepAlive13,
777                        UpdateStateAction::None,
778                    );
779                } else if p.publishing_timer_expired
780                    && self.publishing_enabled
781                    && p.notifications_available
782                    && p.publishing_req_queued
783                {
784                    // State #14
785                    self.first_message_sent = true;
786                    self.state = SubscriptionState::Normal;
787                    return UpdateStateResult::new(
788                        HandledState::KeepAlive14,
789                        UpdateStateAction::ReturnNotifications,
790                    );
791                } else if p.publishing_timer_expired
792                    && p.publishing_req_queued
793                    && self.keep_alive_counter == 1
794                    && (!self.publishing_enabled
795                        || (self.publishing_enabled && p.notifications_available))
796                {
797                    // State #15
798                    self.start_publishing_timer();
799                    self.reset_keep_alive_counter();
800                    return UpdateStateResult::new(
801                        HandledState::KeepAlive15,
802                        UpdateStateAction::ReturnKeepAlive,
803                    );
804                } else if p.publishing_timer_expired
805                    && self.keep_alive_counter > 1
806                    && (!self.publishing_enabled
807                        || (self.publishing_enabled && !p.notifications_available))
808                {
809                    // State #16
810                    self.start_publishing_timer();
811                    self.keep_alive_counter -= 1;
812                    return UpdateStateResult::new(
813                        HandledState::KeepAlive16,
814                        UpdateStateAction::None,
815                    );
816                } else if p.publishing_timer_expired
817                    && !p.publishing_req_queued
818                    && (self.keep_alive_counter == 1
819                        || (self.keep_alive_counter > 1
820                            && self.publishing_enabled
821                            && p.notifications_available))
822                {
823                    // State #17
824                    self.start_publishing_timer();
825                    self.state = SubscriptionState::Late;
826                    return UpdateStateResult::new(
827                        HandledState::KeepAlive17,
828                        UpdateStateAction::None,
829                    );
830                }
831            }
832            _ => {
833                // DO NOTHING
834            }
835        }
836
837        UpdateStateResult::new(HandledState::None0, UpdateStateAction::None)
838    }
839
840    /// Iterate through the monitored items belonging to the subscription, calling tick on each in turn.
841    ///
842    /// Items that are in a reporting state, or triggered to report will be have their pending notifications
843    /// collected together when the publish interval elapsed flag is `true`.
844    ///
845    /// The function returns a `notifications` and a `more_notifications` boolean to indicate if the notifications
846    /// are available.
847    fn tick_monitored_items(
848        &mut self,
849        now: &DateTimeUtc,
850        address_space: &AddressSpace,
851        publishing_interval_elapsed: bool,
852        resend_data: bool,
853    ) -> Option<NotificationMessage> {
854        let mut triggered_items: BTreeSet<u32> = BTreeSet::new();
855        let mut monitored_item_notifications = Vec::with_capacity(self.monitored_items.len() * 2);
856
857        for monitored_item in self.monitored_items.values_mut() {
858            // If this returns true then the monitored item wants to report its notification
859            let monitoring_mode = monitored_item.monitoring_mode();
860            match monitored_item.tick(now, address_space, publishing_interval_elapsed, resend_data)
861            {
862                TickResult::ReportValueChanged => {
863                    if publishing_interval_elapsed {
864                        // If this monitored item has triggered items, then they need to be handled
865                        match monitoring_mode {
866                            MonitoringMode::Reporting => {
867                                // From triggering docs
868                                // If the monitoring mode of the triggering item is REPORTING, then it is reported when the
869                                // triggering item triggers the items to report.
870                                monitored_item.triggered_items().iter().for_each(|i| {
871                                    triggered_items.insert(*i);
872                                })
873                            }
874                            _ => {
875                                // Sampling should have gone in the other branch. Disabled shouldn't do anything.
876                                panic!("How can there be changes to report when monitored item is in this monitoring mode {:?}", monitoring_mode);
877                            }
878                        }
879                        // Take some / all of the monitored item's pending notifications
880                        if let Some(mut item_notification_messages) =
881                            monitored_item.all_notifications()
882                        {
883                            monitored_item_notifications.append(&mut item_notification_messages);
884                        }
885                    }
886                }
887                TickResult::ValueChanged => {
888                    // The monitored item doesn't have changes to report but its value did change so it
889                    // is still necessary to check its triggered items.
890                    if publishing_interval_elapsed {
891                        match monitoring_mode {
892                            MonitoringMode::Sampling => {
893                                // If the monitoring mode of the triggering item is SAMPLING, then it is not reported when the
894                                // triggering item triggers the items to report.
895                                monitored_item.triggered_items().iter().for_each(|i| {
896                                    triggered_items.insert(*i);
897                                })
898                            }
899                            _ => {
900                                // Reporting should have gone in the other branch. Disabled shouldn't do anything.
901                                panic!("How can there be a value change when the mode is not sampling?");
902                            }
903                        }
904                    }
905                }
906                TickResult::NoChange => {
907                    // Ignore
908                }
909            }
910        }
911
912        // Are there any triggered items to force a change on?
913        triggered_items.iter().for_each(|i| {
914            if let Some(ref mut monitored_item) = self.monitored_items.get_mut(i) {
915                // Check the monitoring mode of the item to report
916                match monitored_item.monitoring_mode() {
917                    MonitoringMode::Sampling => {
918                        // If the monitoring mode of the item to report is SAMPLING, then it is reported when the
919                        // triggering item triggers the i tems to report.
920                        //
921                        // Call with the resend_data flag as true to force the monitored item to
922                        monitored_item.check_value(address_space, now, true);
923                        if let Some(mut notifications) = monitored_item.all_notifications() {
924                            monitored_item_notifications.append(&mut notifications);
925                        }
926                    }
927                    MonitoringMode::Reporting => {
928                        // If the monitoring mode of the item to report is REPORTING, this effectively causes the
929                        // triggering item to be ignored. All notifications of the items to report are sent after the
930                        // publishing interval expires.
931                        //
932                        // DO NOTHING
933                    }
934                    MonitoringMode::Disabled => {
935                        // DO NOTHING
936                    }
937                }
938            } else {
939                // It is possible that a monitored item contains a triggered id which has been deleted, so silently
940                // ignore that case.
941            }
942        });
943
944        // Produce a data change notification
945        if !monitored_item_notifications.is_empty() {
946            let next_sequence_number = self.sequence_number.next();
947
948            trace!(
949                "Create notification for subscription {}, sequence number {}",
950                self.subscription_id,
951                next_sequence_number
952            );
953
954            // Collect all datachange notifications
955            let data_change_notifications = monitored_item_notifications
956                .iter()
957                .filter(|v| matches!(v, Notification::MonitoredItemNotification(_)))
958                .map(|v| {
959                    if let Notification::MonitoredItemNotification(v) = v {
960                        v.clone()
961                    } else {
962                        panic!()
963                    }
964                })
965                .collect();
966
967            // Collect event notifications
968            let event_notifications = monitored_item_notifications
969                .iter()
970                .filter(|v| matches!(v, Notification::Event(_)))
971                .map(|v| {
972                    if let Notification::Event(v) = v {
973                        v.clone()
974                    } else {
975                        panic!()
976                    }
977                })
978                .collect();
979
980            // Make a notification
981            let notification = NotificationMessage::data_change(
982                next_sequence_number,
983                DateTime::from(*now),
984                data_change_notifications,
985                event_notifications,
986            );
987            Some(notification)
988        } else {
989            None
990        }
991    }
992
993    /// Reset the keep-alive counter to the maximum keep-alive count of the Subscription.
994    /// The maximum keep-alive count is set by the Client when the Subscription is created
995    /// and may be modified using the ModifySubscription Service
996    pub fn reset_keep_alive_counter(&mut self) {
997        self.keep_alive_counter = self.max_keep_alive_counter;
998    }
999
1000    /// Reset the lifetime counter to the value specified for the life time of the subscription
1001    /// in the create subscription service
1002    pub fn reset_lifetime_counter(&mut self) {
1003        self.lifetime_counter = self.max_lifetime_counter;
1004    }
1005
1006    /// Start or restart the publishing timer and decrement the LifetimeCounter Variable.
1007    pub fn start_publishing_timer(&mut self) {
1008        self.lifetime_counter -= 1;
1009        trace!("Decrementing life time counter {}", self.lifetime_counter);
1010    }
1011
1012    pub fn subscription_id(&self) -> u32 {
1013        self.subscription_id
1014    }
1015
1016    pub fn lifetime_counter(&self) -> u32 {
1017        self.lifetime_counter
1018    }
1019
1020    #[cfg(test)]
1021    pub(crate) fn set_current_lifetime_count(&mut self, current_lifetime_count: u32) {
1022        self.lifetime_counter = current_lifetime_count;
1023    }
1024
1025    pub fn keep_alive_counter(&self) -> u32 {
1026        self.keep_alive_counter
1027    }
1028
1029    #[cfg(test)]
1030    pub(crate) fn set_keep_alive_counter(&mut self, keep_alive_counter: u32) {
1031        self.keep_alive_counter = keep_alive_counter;
1032    }
1033
1034    #[cfg(test)]
1035    pub(crate) fn state(&self) -> SubscriptionState {
1036        self.state
1037    }
1038
1039    #[cfg(test)]
1040    pub(crate) fn set_state(&mut self, state: SubscriptionState) {
1041        self.state = state;
1042    }
1043
1044    pub fn message_sent(&self) -> bool {
1045        self.first_message_sent
1046    }
1047
1048    #[cfg(test)]
1049    pub(crate) fn set_message_sent(&mut self, message_sent: bool) {
1050        self.first_message_sent = message_sent;
1051    }
1052
1053    pub fn publishing_interval(&self) -> Duration {
1054        self.publishing_interval
1055    }
1056
1057    pub(crate) fn set_publishing_interval(&mut self, publishing_interval: Duration) {
1058        self.publishing_interval = publishing_interval;
1059        self.reset_lifetime_counter();
1060    }
1061
1062    pub fn max_keep_alive_count(&self) -> u32 {
1063        self.max_keep_alive_counter
1064    }
1065
1066    pub(crate) fn set_max_keep_alive_count(&mut self, max_keep_alive_count: u32) {
1067        self.max_keep_alive_counter = max_keep_alive_count;
1068    }
1069
1070    pub fn max_lifetime_count(&self) -> u32 {
1071        self.max_lifetime_counter
1072    }
1073
1074    pub(crate) fn set_max_lifetime_count(&mut self, max_lifetime_count: u32) {
1075        self.max_lifetime_counter = max_lifetime_count;
1076    }
1077
1078    pub fn priority(&self) -> u8 {
1079        self.priority
1080    }
1081
1082    pub(crate) fn set_priority(&mut self, priority: u8) {
1083        self.priority = priority;
1084    }
1085
1086    pub(crate) fn set_publishing_enabled(&mut self, publishing_enabled: bool) {
1087        self.publishing_enabled = publishing_enabled;
1088        self.reset_lifetime_counter();
1089    }
1090
1091    pub(crate) fn set_diagnostics_on_drop(&mut self, diagnostics_on_drop: bool) {
1092        self.diagnostics_on_drop = diagnostics_on_drop;
1093    }
1094
1095    fn validate_triggered_items(
1096        &self,
1097        monitored_item_id: u32,
1098        items: &[u32],
1099    ) -> (Vec<StatusCode>, Vec<u32>) {
1100        // Monitored items can only trigger on other items in the subscription that exist
1101        let is_good_monitored_item =
1102            |i| self.monitored_items.contains_key(i) && *i != monitored_item_id;
1103        let is_good_monitored_item_result = |i| {
1104            if is_good_monitored_item(i) {
1105                StatusCode::Good
1106            } else {
1107                StatusCode::BadMonitoredItemIdInvalid
1108            }
1109        };
1110
1111        // Find monitored items that do or do not exist
1112        let results: Vec<StatusCode> = items.iter().map(is_good_monitored_item_result).collect();
1113        let items: Vec<u32> = items
1114            .iter()
1115            .filter(|i| is_good_monitored_item(i))
1116            .copied()
1117            .collect();
1118
1119        (results, items)
1120    }
1121
1122    /// Sets the triggering monitored items on a subscription. This function will validate that
1123    /// the items to add / remove actually exist and will only pass through existing monitored items
1124    /// onto the monitored item itself.
1125    pub(crate) fn set_triggering(
1126        &mut self,
1127        monitored_item_id: u32,
1128        items_to_add: &[u32],
1129        items_to_remove: &[u32],
1130    ) -> Result<(Vec<StatusCode>, Vec<StatusCode>), StatusCode> {
1131        // Find monitored items that do or do not exist
1132        let (add_results, items_to_add) =
1133            self.validate_triggered_items(monitored_item_id, items_to_add);
1134        let (remove_results, items_to_remove) =
1135            self.validate_triggered_items(monitored_item_id, items_to_remove);
1136
1137        if let Some(ref mut monitored_item) = self.monitored_items.get_mut(&monitored_item_id) {
1138            // Set the triggering monitored items
1139            monitored_item.set_triggering(items_to_add.as_slice(), items_to_remove.as_slice());
1140
1141            Ok((add_results, remove_results))
1142        } else {
1143            // This monitored item is unrecognized
1144            Err(StatusCode::BadMonitoredItemIdInvalid)
1145        }
1146    }
1147}