opcua_server/subscriptions/
subscription.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4
5use std::collections::{BTreeSet, HashMap, VecDeque};
6use std::sync::{Arc, RwLock};
7
8use opcua_types::{
9    service_types::{
10        MonitoredItemCreateRequest, MonitoredItemCreateResult, MonitoredItemModifyRequest,
11        MonitoredItemModifyResult, NotificationMessage, TimestampsToReturn,
12    },
13    status_code::StatusCode,
14    *,
15};
16
17use opcua_core::handle::Handle;
18
19use crate::{
20    address_space::AddressSpace,
21    constants,
22    diagnostics::ServerDiagnostics,
23    state::ServerState,
24    subscriptions::monitored_item::{MonitoredItem, Notification, TickResult},
25};
26
27/// The state of the subscription
28#[derive(Debug, Copy, Clone, PartialEq, Serialize)]
29pub(crate) enum SubscriptionState {
30    Closed,
31    Creating,
32    Normal,
33    Late,
34    KeepAlive,
35}
36
37#[derive(Debug)]
38pub(crate) struct SubscriptionStateParams {
39    pub notifications_available: bool,
40    pub more_notifications: bool,
41    pub publishing_req_queued: bool,
42    pub publishing_timer_expired: bool,
43}
44
45#[derive(Debug, Copy, Clone, PartialEq)]
46pub enum UpdateStateAction {
47    None,
48    // Return a keep alive
49    ReturnKeepAlive,
50    // Return notifications
51    ReturnNotifications,
52    // The subscription was created normally
53    SubscriptionCreated,
54    // The subscription has expired and must be closed
55    SubscriptionExpired,
56}
57
58/// This is for debugging purposes. It allows the caller to validate the output state if required.
59///
60/// Values correspond to state table in OPC UA Part 4 5.13.1.2
61///
62#[derive(Debug, Copy, Clone, PartialEq)]
63pub(crate) enum HandledState {
64    None0 = 0,
65    Create3 = 3,
66    Normal4 = 4,
67    Normal5 = 5,
68    IntervalElapsed6 = 6,
69    IntervalElapsed7 = 7,
70    IntervalElapsed8 = 8,
71    IntervalElapsed9 = 9,
72    Late10 = 10,
73    Late11 = 11,
74    Late12 = 12,
75    KeepAlive13 = 13,
76    KeepAlive14 = 14,
77    KeepAlive15 = 15,
78    KeepAlive16 = 16,
79    KeepAlive17 = 17,
80    Closed27 = 27,
81}
82
83/// This is for debugging purposes. It allows the caller to validate the output state if required.
84#[derive(Debug)]
85pub(crate) struct UpdateStateResult {
86    pub handled_state: HandledState,
87    pub update_state_action: UpdateStateAction,
88}
89
90impl UpdateStateResult {
91    pub fn new(
92        handled_state: HandledState,
93        update_state_action: UpdateStateAction,
94    ) -> UpdateStateResult {
95        UpdateStateResult {
96            handled_state,
97            update_state_action,
98        }
99    }
100}
101
102#[derive(Debug, Copy, Clone, PartialEq)]
103pub(crate) enum TickReason {
104    ReceivePublishRequest,
105    TickTimerFired,
106}
107
108#[derive(Debug, Clone, Serialize)]
109pub struct Subscription {
110    /// Subscription id
111    subscription_id: u32,
112    /// Publishing interval in milliseconds
113    publishing_interval: Duration,
114    /// The lifetime count reset value
115    max_lifetime_counter: u32,
116    /// Keep alive count reset value
117    max_keep_alive_counter: u32,
118    /// Relative priority of the subscription. When more than one subscriptio
119    ///  needs to send notifications the highest priority subscription should
120    /// be sent first.
121    priority: u8,
122    /// Map of monitored items
123    monitored_items: HashMap<u32, MonitoredItem>,
124    /// State of the subscription
125    state: SubscriptionState,
126    /// A value that contains the number of consecutive publishing timer expirations without Client
127    /// activity before the Subscription is terminated.
128    lifetime_counter: u32,
129    /// Keep alive counter decrements when there are no notifications to publish and when it expires
130    /// requests to send an empty notification as a keep alive event
131    keep_alive_counter: u32,
132    /// boolean value that is set to true to mean that either a NotificationMessage or a keep-alive
133    /// Message has been sent on the Subscription. It is a flag that is used to ensure that either
134    /// a NotificationMessage or a keep-alive Message is sent out the first time the publishing timer
135    /// expires.
136    first_message_sent: bool,
137    /// The parameter that requests publishing to be enabled or disabled.
138    publishing_enabled: bool,
139    /// A flag that tells the subscription to send the latest value of every monitored item on the
140    /// next publish request.
141    resend_data: bool,
142    /// The next sequence number to be sent
143    sequence_number: Handle,
144    /// Last notification's sequence number. This is a sanity check since sequence numbers should start from
145    /// 1 and be sequential - it that doesn't happen the server will panic because something went
146    /// wrong somewhere.
147    last_sequence_number: u32,
148    // The last monitored item id
149    next_monitored_item_id: u32,
150    // The time that the subscription interval last fired
151    last_time_publishing_interval_elapsed: DateTimeUtc,
152    // Currently outstanding notifications to send
153    #[serde(skip)]
154    notifications: VecDeque<NotificationMessage>,
155    /// Server diagnostics to track creation / destruction / modification of the subscription
156    #[serde(skip)]
157    diagnostics: Arc<RwLock<ServerDiagnostics>>,
158    /// Stops the subscription calling diagnostics on drop
159    #[serde(skip)]
160    diagnostics_on_drop: bool,
161}
162
163impl Drop for Subscription {
164    fn drop(&mut self) {
165        if self.diagnostics_on_drop {
166            let mut diagnostics = trace_write_lock!(self.diagnostics);
167            diagnostics.on_destroy_subscription(self);
168        }
169    }
170}
171
172impl Subscription {
173    pub fn new(
174        diagnostics: Arc<RwLock<ServerDiagnostics>>,
175        subscription_id: u32,
176        publishing_enabled: bool,
177        publishing_interval: Duration,
178        lifetime_counter: u32,
179        keep_alive_counter: u32,
180        priority: u8,
181    ) -> Subscription {
182        let subscription = Subscription {
183            subscription_id,
184            publishing_interval,
185            priority,
186            monitored_items: HashMap::with_capacity(constants::DEFAULT_MONITORED_ITEM_CAPACITY),
187            max_lifetime_counter: lifetime_counter,
188            max_keep_alive_counter: keep_alive_counter,
189            // State variables
190            state: SubscriptionState::Creating,
191            lifetime_counter,
192            keep_alive_counter,
193            first_message_sent: false,
194            publishing_enabled,
195            resend_data: false,
196            // Counters for new items
197            sequence_number: Handle::new(1),
198            last_sequence_number: 0,
199            next_monitored_item_id: 1,
200            last_time_publishing_interval_elapsed: chrono::Utc::now(),
201            notifications: VecDeque::with_capacity(100),
202            diagnostics,
203            diagnostics_on_drop: true,
204        };
205        {
206            let mut diagnostics = trace_write_lock!(subscription.diagnostics);
207            diagnostics.on_create_subscription(&subscription);
208        }
209        subscription
210    }
211
212    pub(crate) fn ready_to_remove(&self) -> bool {
213        self.state == SubscriptionState::Closed && self.notifications.is_empty()
214    }
215
216    /// Creates a MonitoredItemCreateResult containing an error code
217    fn monitored_item_create_error(status_code: StatusCode) -> MonitoredItemCreateResult {
218        MonitoredItemCreateResult {
219            status_code,
220            monitored_item_id: 0,
221            revised_sampling_interval: 0f64,
222            revised_queue_size: 0,
223            filter_result: ExtensionObject::null(),
224        }
225    }
226
227    pub fn monitored_items_len(&self) -> usize {
228        self.monitored_items.len()
229    }
230
231    /// Creates monitored items on the specified subscription, returning the creation results
232    pub fn create_monitored_items(
233        &mut self,
234        server_state: &ServerState,
235        address_space: &AddressSpace,
236        now: &DateTimeUtc,
237        timestamps_to_return: TimestampsToReturn,
238        items_to_create: &[MonitoredItemCreateRequest],
239    ) -> Vec<MonitoredItemCreateResult> {
240        self.reset_lifetime_counter();
241
242        // Add items to the subscription if they're not already in its
243        items_to_create
244            .iter()
245            .map(|item_to_create| {
246                if !address_space.node_exists(&item_to_create.item_to_monitor.node_id) {
247                    Self::monitored_item_create_error(StatusCode::BadNodeIdUnknown)
248                } else {
249                    // TODO validate the attribute id for the type of node
250                    // TODO validate the index range for the node
251
252                    // Create a monitored item, if possible
253                    let monitored_item_id = self.next_monitored_item_id;
254                    match MonitoredItem::new(
255                        now,
256                        monitored_item_id,
257                        timestamps_to_return,
258                        server_state,
259                        item_to_create,
260                    ) {
261                        Ok(monitored_item) => {
262                            if server_state.max_monitored_items_per_sub == 0
263                                || self.monitored_items.len()
264                                    <= server_state.max_monitored_items_per_sub
265                            {
266                                let revised_sampling_interval = monitored_item.sampling_interval();
267                                let revised_queue_size = monitored_item.queue_size() as u32;
268                                // Validate the filter before registering the item
269                                match monitored_item.validate_filter(address_space) {
270                                    Ok(filter_result) => {
271                                        // Register the item with the subscription
272                                        self.monitored_items
273                                            .insert(monitored_item_id, monitored_item);
274                                        self.next_monitored_item_id += 1;
275                                        MonitoredItemCreateResult {
276                                            status_code: StatusCode::Good,
277                                            monitored_item_id,
278                                            revised_sampling_interval,
279                                            revised_queue_size,
280                                            filter_result,
281                                        }
282                                    }
283                                    Err(status_code) => {
284                                        Self::monitored_item_create_error(status_code)
285                                    }
286                                }
287                            } else {
288                                // Number of monitored items exceeds limit per sub
289                                Self::monitored_item_create_error(
290                                    StatusCode::BadTooManyMonitoredItems,
291                                )
292                            }
293                        }
294                        Err(status_code) => Self::monitored_item_create_error(status_code),
295                    }
296                }
297            })
298            .collect()
299    }
300
301    /// Modify the specified monitored items, returning a result for each
302    pub fn modify_monitored_items(
303        &mut self,
304        server_state: &ServerState,
305        address_space: &AddressSpace,
306        timestamps_to_return: TimestampsToReturn,
307        items_to_modify: &[MonitoredItemModifyRequest],
308    ) -> Vec<MonitoredItemModifyResult> {
309        self.reset_lifetime_counter();
310        items_to_modify
311            .iter()
312            .map(|item_to_modify| {
313                match self
314                    .monitored_items
315                    .get_mut(&item_to_modify.monitored_item_id)
316                {
317                    Some(monitored_item) => {
318                        // Try to change the monitored item according to the modify request
319                        let modify_result = monitored_item.modify(
320                            server_state,
321                            address_space,
322                            timestamps_to_return,
323                            item_to_modify,
324                        );
325                        match modify_result {
326                            Ok(filter_result) => MonitoredItemModifyResult {
327                                status_code: StatusCode::Good,
328                                revised_sampling_interval: monitored_item.sampling_interval(),
329                                revised_queue_size: monitored_item.queue_size() as u32,
330                                filter_result,
331                            },
332                            Err(err) => MonitoredItemModifyResult {
333                                status_code: err,
334                                revised_sampling_interval: 0f64,
335                                revised_queue_size: 0,
336                                filter_result: ExtensionObject::null(),
337                            },
338                        }
339                    }
340                    // Item does not exist
341                    None => MonitoredItemModifyResult {
342                        status_code: StatusCode::BadMonitoredItemIdInvalid,
343                        revised_sampling_interval: 0f64,
344                        revised_queue_size: 0,
345                        filter_result: ExtensionObject::null(),
346                    },
347                }
348            })
349            .collect()
350    }
351
352    /// Sets the monitoring mode on one monitored item
353    pub fn set_monitoring_mode(
354        &mut self,
355        monitored_item_id: u32,
356        monitoring_mode: MonitoringMode,
357    ) -> StatusCode {
358        if let Some(monitored_item) = self.monitored_items.get_mut(&monitored_item_id) {
359            monitored_item.set_monitoring_mode(monitoring_mode);
360            StatusCode::Good
361        } else {
362            StatusCode::BadMonitoredItemIdInvalid
363        }
364    }
365
366    /// Delete the specified monitored items (by item id), returning a status code for each
367    pub fn delete_monitored_items(&mut self, items_to_delete: &[u32]) -> Vec<StatusCode> {
368        self.reset_lifetime_counter();
369        items_to_delete
370            .iter()
371            .map(
372                |item_to_delete| match self.monitored_items.remove(item_to_delete) {
373                    Some(_) => StatusCode::Good,
374                    None => StatusCode::BadMonitoredItemIdInvalid,
375                },
376            )
377            .collect()
378    }
379
380    // Returns two vecs representing the server and client handles for each monitored item.
381    // Called from the GetMonitoredItems impl
382    pub fn get_handles(&self) -> (Vec<u32>, Vec<u32>) {
383        let server_handles = self
384            .monitored_items
385            .values()
386            .map(|i| i.monitored_item_id())
387            .collect();
388        let client_handles = self
389            .monitored_items
390            .values()
391            .map(|i| i.client_handle())
392            .collect();
393        (server_handles, client_handles)
394    }
395
396    /// Sets the resend data flag which means the next publish request will receive the latest value
397    /// of every monitored item whether it has changed in this cycle or not.
398    pub fn set_resend_data(&mut self) {
399        self.resend_data = true;
400    }
401
402    /// Tests if the publishing interval has elapsed since the last time this function in which case
403    /// it returns `true` and updates its internal state.
404    fn test_and_set_publishing_interval_elapsed(&mut self, now: &DateTimeUtc) -> bool {
405        // Look at the last expiration time compared to now and see if it matches
406        // or exceeds the publishing interval
407        let publishing_interval = super::duration_from_ms(self.publishing_interval);
408        let elapsed = now.signed_duration_since(self.last_time_publishing_interval_elapsed);
409        if elapsed >= publishing_interval {
410            self.last_time_publishing_interval_elapsed = *now;
411            true
412        } else {
413            false
414        }
415    }
416
417    /// Checks the subscription and monitored items for state change, messages. Returns `true`
418    /// if there are zero or more notifications waiting to be processed.
419    pub(crate) fn tick(
420        &mut self,
421        now: &DateTimeUtc,
422        address_space: &AddressSpace,
423        tick_reason: TickReason,
424        publishing_req_queued: bool,
425    ) {
426        // Check if the publishing interval has elapsed. Only checks on the tick timer.
427        let publishing_interval_elapsed = match tick_reason {
428            TickReason::ReceivePublishRequest => false,
429            TickReason::TickTimerFired => {
430                if self.state == SubscriptionState::Creating {
431                    true
432                } else if self.publishing_interval <= 0f64 {
433                    panic!("Publishing interval should have been revised to min interval")
434                } else {
435                    self.test_and_set_publishing_interval_elapsed(now)
436                }
437            }
438        };
439
440        // Do a tick on monitored items. Note that monitored items normally update when the interval
441        // elapses but they don't have to. So this is called every tick just to catch items with their
442        // own intervals.
443
444        let notification = match self.state {
445            SubscriptionState::Closed | SubscriptionState::Creating => None,
446            _ => {
447                let resend_data = self.resend_data;
448                self.tick_monitored_items(
449                    now,
450                    address_space,
451                    publishing_interval_elapsed,
452                    resend_data,
453                )
454            }
455        };
456        self.resend_data = false;
457
458        let notifications_available = !self.notifications.is_empty() || notification.is_some();
459        let more_notifications = self.notifications.len() > 1;
460
461        // If items have changed or subscription interval elapsed then we may have notifications
462        // to send or state to update
463        if notifications_available || publishing_interval_elapsed || publishing_req_queued {
464            // Update the internal state of the subscription based on what happened
465            let update_state_result = self.update_state(
466                tick_reason,
467                SubscriptionStateParams {
468                    publishing_req_queued,
469                    notifications_available,
470                    more_notifications,
471                    publishing_timer_expired: publishing_interval_elapsed,
472                },
473            );
474            trace!(
475                "subscription tick - update_state_result = {:?}",
476                update_state_result
477            );
478            self.handle_state_result(now, update_state_result, notification);
479        }
480    }
481
482    fn enqueue_notification(&mut self, notification: NotificationMessage) {
483        use std::u32;
484        // For sanity, check the sequence number is the expected sequence number.
485        let expected_sequence_number = if self.last_sequence_number == u32::MAX {
486            1
487        } else {
488            self.last_sequence_number + 1
489        };
490        if notification.sequence_number != expected_sequence_number {
491            panic!(
492                "Notification's sequence number is not sequential, expecting {}, got {}",
493                expected_sequence_number, notification.sequence_number
494            );
495        }
496        // debug!("Enqueuing notification {:?}", notification);
497        self.last_sequence_number = notification.sequence_number;
498        self.notifications.push_back(notification);
499    }
500
501    fn handle_state_result(
502        &mut self,
503        now: &DateTimeUtc,
504        update_state_result: UpdateStateResult,
505        notification: Option<NotificationMessage>,
506    ) {
507        // Now act on the state's action
508        match update_state_result.update_state_action {
509            UpdateStateAction::None => {
510                if let Some(ref notification) = notification {
511                    // Reset the next sequence number to the discarded notification
512                    let notification_sequence_number = notification.sequence_number;
513                    self.sequence_number.set_next(notification_sequence_number);
514                    debug!("Notification message nr {} was being ignored for a do-nothing, update state was {:?}", notification_sequence_number, update_state_result);
515                }
516                // Send nothing
517            }
518            UpdateStateAction::ReturnKeepAlive => {
519                if let Some(ref notification) = notification {
520                    // Reset the next sequence number to the discarded notification
521                    let notification_sequence_number = notification.sequence_number;
522                    self.sequence_number.set_next(notification_sequence_number);
523                    debug!("Notification message nr {} was being ignored for a keep alive, update state was {:?}", notification_sequence_number, update_state_result);
524                }
525                // Send a keep alive
526                debug!("Sending keep alive response");
527                let notification = NotificationMessage::keep_alive(
528                    self.sequence_number.next(),
529                    DateTime::from(*now),
530                );
531                self.enqueue_notification(notification);
532            }
533            UpdateStateAction::ReturnNotifications => {
534                // Add the notification message to the queue
535                if let Some(notification) = notification {
536                    self.enqueue_notification(notification);
537                }
538            }
539            UpdateStateAction::SubscriptionCreated => {
540                if notification.is_some() {
541                    panic!("SubscriptionCreated got a notification");
542                }
543                // Subscription was created successfully
544                //                let notification = NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::Good);
545                //                self.enqueue_notification(notification);
546            }
547            UpdateStateAction::SubscriptionExpired => {
548                if notification.is_some() {
549                    panic!("SubscriptionExpired got a notification");
550                }
551                // Delete the monitored items, issue a status change for the subscription
552                debug!("Subscription status change to closed / timeout");
553                self.monitored_items.clear();
554                let notification = NotificationMessage::status_change(
555                    self.sequence_number.next(),
556                    DateTime::from(*now),
557                    StatusCode::BadTimeout,
558                );
559                self.enqueue_notification(notification);
560            }
561        }
562    }
563
564    pub(crate) fn take_notification(&mut self) -> Option<NotificationMessage> {
565        self.notifications.pop_front()
566    }
567
568    // See OPC UA Part 4 5.13.1.2 State Table
569    //
570    // This function implements the main guts of updating the subscription's state according to
571    // some input events and its existing internal state.
572    //
573    // Calls to the function will update the internal state of and return a tuple with any required
574    // actions.
575    //
576    // Note that some state events are handled outside of update_state. e.g. the subscription
577    // is created elsewhere which handles states 1, 2 and 3.
578    //
579    // Inputs:
580    //
581    // * publish_request - an optional publish request. May be used by subscription to remove acknowledged notifications
582    // * publishing_interval_elapsed - true if the publishing interval has elapsed
583    //
584    // Returns in order:
585    //
586    // * State id that handled this call. Useful for debugging which state handler triggered
587    // * Update state action - none, return notifications, return keep alive
588    // * Publishing request action - nothing, dequeue
589    //
590    pub(crate) fn update_state(
591        &mut self,
592        tick_reason: TickReason,
593        p: SubscriptionStateParams,
594    ) -> UpdateStateResult {
595        // This function is called when a publish request is received OR the timer expired, so getting
596        // both is invalid code somewhere
597        if tick_reason == TickReason::ReceivePublishRequest && p.publishing_timer_expired {
598            panic!("Should not be possible for timer to have expired and received publish request at same time")
599        }
600
601        // Extra state debugging
602        {
603            use log::Level::Trace;
604            if log_enabled!(Trace) {
605                trace!(
606                    r#"State inputs:
607    subscription_id: {} / state: {:?}
608    tick_reason: {:?} / state_params: {:?}
609    publishing_enabled: {}
610    keep_alive_counter / lifetime_counter: {} / {}
611    message_sent: {}"#,
612                    self.subscription_id,
613                    self.state,
614                    tick_reason,
615                    p,
616                    self.publishing_enabled,
617                    self.keep_alive_counter,
618                    self.lifetime_counter,
619                    self.first_message_sent
620                );
621            }
622        }
623
624        // This is a state engine derived from OPC UA Part 4 Publish service and might look a
625        // little odd for that.
626        //
627        // Note in some cases, some of the actions have already happened outside of this function.
628        // For example, publish requests are already queued before we come in here and this function
629        // uses what its given. Likewise, this function does not "send" notifications, rather
630        // it returns them (if any) and it is up to the caller to send them
631
632        // more state tests that match on more than one state
633        match self.state {
634            SubscriptionState::Normal | SubscriptionState::Late | SubscriptionState::KeepAlive => {
635                if self.lifetime_counter == 1 {
636                    // State #27
637                    self.state = SubscriptionState::Closed;
638                    return UpdateStateResult::new(
639                        HandledState::Closed27,
640                        UpdateStateAction::SubscriptionExpired,
641                    );
642                }
643            }
644            _ => {
645                // DO NOTHING
646            }
647        }
648
649        match self.state {
650            SubscriptionState::Creating => {
651                // State #2
652                // CreateSubscription fails, return negative response
653                // Handled in message handler
654                // State #3
655                self.state = SubscriptionState::Normal;
656                self.first_message_sent = false;
657                return UpdateStateResult::new(
658                    HandledState::Create3,
659                    UpdateStateAction::SubscriptionCreated,
660                );
661            }
662            SubscriptionState::Normal => {
663                if tick_reason == TickReason::ReceivePublishRequest
664                    && (!self.publishing_enabled
665                        || (self.publishing_enabled && !p.more_notifications))
666                {
667                    // State #4
668                    return UpdateStateResult::new(HandledState::Normal4, UpdateStateAction::None);
669                } else if tick_reason == TickReason::ReceivePublishRequest
670                    && self.publishing_enabled
671                    && p.more_notifications
672                {
673                    // State #5
674                    self.reset_lifetime_counter();
675                    self.first_message_sent = true;
676                    return UpdateStateResult::new(
677                        HandledState::Normal5,
678                        UpdateStateAction::ReturnNotifications,
679                    );
680                } else if p.publishing_timer_expired
681                    && p.publishing_req_queued
682                    && self.publishing_enabled
683                    && p.notifications_available
684                {
685                    // State #6
686                    self.reset_lifetime_counter();
687                    self.start_publishing_timer();
688                    self.first_message_sent = true;
689                    return UpdateStateResult::new(
690                        HandledState::IntervalElapsed6,
691                        UpdateStateAction::ReturnNotifications,
692                    );
693                } else if p.publishing_timer_expired
694                    && p.publishing_req_queued
695                    && !self.first_message_sent
696                    && (!self.publishing_enabled
697                        || (self.publishing_enabled && !p.notifications_available))
698                {
699                    // State #7
700                    self.reset_lifetime_counter();
701                    self.start_publishing_timer();
702                    self.first_message_sent = true;
703                    return UpdateStateResult::new(
704                        HandledState::IntervalElapsed7,
705                        UpdateStateAction::ReturnKeepAlive,
706                    );
707                } else if p.publishing_timer_expired
708                    && !p.publishing_req_queued
709                    && (!self.first_message_sent
710                        || (self.publishing_enabled && p.notifications_available))
711                {
712                    // State #8
713                    self.start_publishing_timer();
714                    self.state = SubscriptionState::Late;
715                    return UpdateStateResult::new(
716                        HandledState::IntervalElapsed8,
717                        UpdateStateAction::None,
718                    );
719                } else if p.publishing_timer_expired
720                    && self.first_message_sent
721                    && (!self.publishing_enabled
722                        || (self.publishing_enabled && !p.notifications_available))
723                {
724                    // State #9
725                    self.start_publishing_timer();
726                    self.reset_keep_alive_counter();
727                    self.state = SubscriptionState::KeepAlive;
728                    return UpdateStateResult::new(
729                        HandledState::IntervalElapsed9,
730                        UpdateStateAction::None,
731                    );
732                }
733            }
734            SubscriptionState::Late => {
735                if tick_reason == TickReason::ReceivePublishRequest
736                    && self.publishing_enabled
737                    && (p.notifications_available || p.more_notifications)
738                {
739                    // State #10
740                    self.reset_lifetime_counter();
741                    self.state = SubscriptionState::Normal;
742                    self.first_message_sent = true;
743                    return UpdateStateResult::new(
744                        HandledState::Late10,
745                        UpdateStateAction::ReturnNotifications,
746                    );
747                } else if tick_reason == TickReason::ReceivePublishRequest
748                    && (!self.publishing_enabled
749                        || (self.publishing_enabled
750                            && !p.notifications_available
751                            && !p.more_notifications))
752                {
753                    // State #11
754                    self.reset_lifetime_counter();
755                    self.state = SubscriptionState::KeepAlive;
756                    self.first_message_sent = true;
757                    return UpdateStateResult::new(
758                        HandledState::Late11,
759                        UpdateStateAction::ReturnKeepAlive,
760                    );
761                } else if p.publishing_timer_expired {
762                    // State #12
763                    self.start_publishing_timer();
764                    return UpdateStateResult::new(HandledState::Late12, UpdateStateAction::None);
765                }
766            }
767            SubscriptionState::KeepAlive => {
768                if tick_reason == TickReason::ReceivePublishRequest {
769                    // State #13
770                    return UpdateStateResult::new(
771                        HandledState::KeepAlive13,
772                        UpdateStateAction::None,
773                    );
774                } else if p.publishing_timer_expired
775                    && self.publishing_enabled
776                    && p.notifications_available
777                    && p.publishing_req_queued
778                {
779                    // State #14
780                    self.first_message_sent = true;
781                    self.state = SubscriptionState::Normal;
782                    return UpdateStateResult::new(
783                        HandledState::KeepAlive14,
784                        UpdateStateAction::ReturnNotifications,
785                    );
786                } else if p.publishing_timer_expired
787                    && p.publishing_req_queued
788                    && self.keep_alive_counter == 1
789                    && (!self.publishing_enabled
790                        || (self.publishing_enabled && p.notifications_available))
791                {
792                    // State #15
793                    self.start_publishing_timer();
794                    self.reset_keep_alive_counter();
795                    return UpdateStateResult::new(
796                        HandledState::KeepAlive15,
797                        UpdateStateAction::ReturnKeepAlive,
798                    );
799                } else if p.publishing_timer_expired
800                    && self.keep_alive_counter > 1
801                    && (!self.publishing_enabled
802                        || (self.publishing_enabled && !p.notifications_available))
803                {
804                    // State #16
805                    self.start_publishing_timer();
806                    self.keep_alive_counter -= 1;
807                    return UpdateStateResult::new(
808                        HandledState::KeepAlive16,
809                        UpdateStateAction::None,
810                    );
811                } else if p.publishing_timer_expired
812                    && !p.publishing_req_queued
813                    && (self.keep_alive_counter == 1
814                        || (self.keep_alive_counter > 1
815                            && self.publishing_enabled
816                            && p.notifications_available))
817                {
818                    // State #17
819                    self.start_publishing_timer();
820                    self.state = SubscriptionState::Late;
821                    return UpdateStateResult::new(
822                        HandledState::KeepAlive17,
823                        UpdateStateAction::None,
824                    );
825                }
826            }
827            _ => {
828                // DO NOTHING
829            }
830        }
831
832        UpdateStateResult::new(HandledState::None0, UpdateStateAction::None)
833    }
834
835    /// Iterate through the monitored items belonging to the subscription, calling tick on each in turn.
836    ///
837    /// Items that are in a reporting state, or triggered to report will be have their pending notifications
838    /// collected together when the publish interval elapsed flag is `true`.
839    ///
840    /// The function returns a `notifications` and a `more_notifications` boolean to indicate if the notifications
841    /// are available.
842    fn tick_monitored_items(
843        &mut self,
844        now: &DateTimeUtc,
845        address_space: &AddressSpace,
846        publishing_interval_elapsed: bool,
847        resend_data: bool,
848    ) -> Option<NotificationMessage> {
849        let mut triggered_items: BTreeSet<u32> = BTreeSet::new();
850        let mut monitored_item_notifications = Vec::with_capacity(self.monitored_items.len() * 2);
851
852        for monitored_item in self.monitored_items.values_mut() {
853            // If this returns true then the monitored item wants to report its notification
854            let monitoring_mode = monitored_item.monitoring_mode();
855            match monitored_item.tick(now, address_space, publishing_interval_elapsed, resend_data)
856            {
857                TickResult::ReportValueChanged => {
858                    if publishing_interval_elapsed {
859                        // If this monitored item has triggered items, then they need to be handled
860                        match monitoring_mode {
861                            MonitoringMode::Reporting => {
862                                // From triggering docs
863                                // If the monitoring mode of the triggering item is REPORTING, then it is reported when the
864                                // triggering item triggers the items to report.
865                                monitored_item.triggered_items().iter().for_each(|i| {
866                                    triggered_items.insert(*i);
867                                })
868                            }
869                            _ => {
870                                // Sampling should have gone in the other branch. Disabled shouldn't do anything.
871                                panic!("How can there be changes to report when monitored item is in this monitoring mode {:?}", monitoring_mode);
872                            }
873                        }
874                        // Take some / all of the monitored item's pending notifications
875                        if let Some(mut item_notification_messages) =
876                            monitored_item.all_notifications()
877                        {
878                            monitored_item_notifications.append(&mut item_notification_messages);
879                        }
880                    }
881                }
882                TickResult::ValueChanged => {
883                    // The monitored item doesn't have changes to report but its value did change so it
884                    // is still necessary to check its triggered items.
885                    if publishing_interval_elapsed {
886                        match monitoring_mode {
887                            MonitoringMode::Sampling => {
888                                // If the monitoring mode of the triggering item is SAMPLING, then it is not reported when the
889                                // triggering item triggers the items to report.
890                                monitored_item.triggered_items().iter().for_each(|i| {
891                                    triggered_items.insert(*i);
892                                })
893                            }
894                            _ => {
895                                // Reporting should have gone in the other branch. Disabled shouldn't do anything.
896                                panic!("How can there be a value change when the mode is not sampling?");
897                            }
898                        }
899                    }
900                }
901                TickResult::NoChange => {
902                    // Ignore
903                }
904            }
905        }
906
907        // Are there any triggered items to force a change on?
908        triggered_items.iter().for_each(|i| {
909            if let Some(ref mut monitored_item) = self.monitored_items.get_mut(i) {
910                // Check the monitoring mode of the item to report
911                match monitored_item.monitoring_mode() {
912                    MonitoringMode::Sampling => {
913                        // If the monitoring mode of the item to report is SAMPLING, then it is reported when the
914                        // triggering item triggers the i tems to report.
915                        //
916                        // Call with the resend_data flag as true to force the monitored item to
917                        monitored_item.check_value(address_space, now, true);
918                        if let Some(mut notifications) = monitored_item.all_notifications() {
919                            monitored_item_notifications.append(&mut notifications);
920                        }
921                    }
922                    MonitoringMode::Reporting => {
923                        // If the monitoring mode of the item to report is REPORTING, this effectively causes the
924                        // triggering item to be ignored. All notifications of the items to report are sent after the
925                        // publishing interval expires.
926                        //
927                        // DO NOTHING
928                    }
929                    MonitoringMode::Disabled => {
930                        // DO NOTHING
931                    }
932                }
933            } else {
934                // It is possible that a monitored item contains a triggered id which has been deleted, so silently
935                // ignore that case.
936            }
937        });
938
939        // Produce a data change notification
940        if !monitored_item_notifications.is_empty() {
941            let next_sequence_number = self.sequence_number.next();
942
943            trace!(
944                "Create notification for subscription {}, sequence number {}",
945                self.subscription_id,
946                next_sequence_number
947            );
948
949            // Collect all datachange notifications
950            let data_change_notifications = monitored_item_notifications
951                .iter()
952                .filter(|v| matches!(v, Notification::MonitoredItemNotification(_)))
953                .map(|v| {
954                    if let Notification::MonitoredItemNotification(v) = v {
955                        v.clone()
956                    } else {
957                        panic!()
958                    }
959                })
960                .collect();
961
962            // Collect event notifications
963            let event_notifications = monitored_item_notifications
964                .iter()
965                .filter(|v| matches!(v, Notification::Event(_)))
966                .map(|v| {
967                    if let Notification::Event(v) = v {
968                        v.clone()
969                    } else {
970                        panic!()
971                    }
972                })
973                .collect();
974
975            // Make a notification
976            let notification = NotificationMessage::data_change(
977                next_sequence_number,
978                DateTime::from(*now),
979                data_change_notifications,
980                event_notifications,
981            );
982            Some(notification)
983        } else {
984            None
985        }
986    }
987
988    /// Reset the keep-alive counter to the maximum keep-alive count of the Subscription.
989    /// The maximum keep-alive count is set by the Client when the Subscription is created
990    /// and may be modified using the ModifySubscription Service
991    pub fn reset_keep_alive_counter(&mut self) {
992        self.keep_alive_counter = self.max_keep_alive_counter;
993    }
994
995    /// Reset the lifetime counter to the value specified for the life time of the subscription
996    /// in the create subscription service
997    pub fn reset_lifetime_counter(&mut self) {
998        self.lifetime_counter = self.max_lifetime_counter;
999    }
1000
1001    /// Start or restart the publishing timer and decrement the LifetimeCounter Variable.
1002    pub fn start_publishing_timer(&mut self) {
1003        self.lifetime_counter -= 1;
1004        trace!("Decrementing life time counter {}", self.lifetime_counter);
1005    }
1006
1007    pub fn subscription_id(&self) -> u32 {
1008        self.subscription_id
1009    }
1010
1011    pub fn lifetime_counter(&self) -> u32 {
1012        self.lifetime_counter
1013    }
1014
1015    #[cfg(test)]
1016    pub(crate) fn set_current_lifetime_count(&mut self, current_lifetime_count: u32) {
1017        self.lifetime_counter = current_lifetime_count;
1018    }
1019
1020    pub fn keep_alive_counter(&self) -> u32 {
1021        self.keep_alive_counter
1022    }
1023
1024    #[cfg(test)]
1025    pub(crate) fn set_keep_alive_counter(&mut self, keep_alive_counter: u32) {
1026        self.keep_alive_counter = keep_alive_counter;
1027    }
1028
1029    #[cfg(test)]
1030    pub(crate) fn state(&self) -> SubscriptionState {
1031        self.state
1032    }
1033
1034    #[cfg(test)]
1035    pub(crate) fn set_state(&mut self, state: SubscriptionState) {
1036        self.state = state;
1037    }
1038
1039    pub fn message_sent(&self) -> bool {
1040        self.first_message_sent
1041    }
1042
1043    #[cfg(test)]
1044    pub(crate) fn set_message_sent(&mut self, message_sent: bool) {
1045        self.first_message_sent = message_sent;
1046    }
1047
1048    pub fn publishing_interval(&self) -> Duration {
1049        self.publishing_interval
1050    }
1051
1052    pub(crate) fn set_publishing_interval(&mut self, publishing_interval: Duration) {
1053        self.publishing_interval = publishing_interval;
1054        self.reset_lifetime_counter();
1055    }
1056
1057    pub fn max_keep_alive_count(&self) -> u32 {
1058        self.max_keep_alive_counter
1059    }
1060
1061    pub(crate) fn set_max_keep_alive_count(&mut self, max_keep_alive_count: u32) {
1062        self.max_keep_alive_counter = max_keep_alive_count;
1063    }
1064
1065    pub fn max_lifetime_count(&self) -> u32 {
1066        self.max_lifetime_counter
1067    }
1068
1069    pub(crate) fn set_max_lifetime_count(&mut self, max_lifetime_count: u32) {
1070        self.max_lifetime_counter = max_lifetime_count;
1071    }
1072
1073    pub fn priority(&self) -> u8 {
1074        self.priority
1075    }
1076
1077    pub(crate) fn set_priority(&mut self, priority: u8) {
1078        self.priority = priority;
1079    }
1080
1081    pub(crate) fn set_publishing_enabled(&mut self, publishing_enabled: bool) {
1082        self.publishing_enabled = publishing_enabled;
1083        self.reset_lifetime_counter();
1084    }
1085
1086    pub(crate) fn set_diagnostics_on_drop(&mut self, diagnostics_on_drop: bool) {
1087        self.diagnostics_on_drop = diagnostics_on_drop;
1088    }
1089
1090    fn validate_triggered_items(
1091        &self,
1092        monitored_item_id: u32,
1093        items: &[u32],
1094    ) -> (Vec<StatusCode>, Vec<u32>) {
1095        // Monitored items can only trigger on other items in the subscription that exist
1096        let is_good_monitored_item =
1097            |i| self.monitored_items.contains_key(i) && *i != monitored_item_id;
1098        let is_good_monitored_item_result = |i| {
1099            if is_good_monitored_item(i) {
1100                StatusCode::Good
1101            } else {
1102                StatusCode::BadMonitoredItemIdInvalid
1103            }
1104        };
1105
1106        // Find monitored items that do or do not exist
1107        let results: Vec<StatusCode> = items.iter().map(is_good_monitored_item_result).collect();
1108        let items: Vec<u32> = items
1109            .iter()
1110            .filter(|i| is_good_monitored_item(i))
1111            .copied()
1112            .collect();
1113
1114        (results, items)
1115    }
1116
1117    /// Sets the triggering monitored items on a subscription. This function will validate that
1118    /// the items to add / remove actually exist and will only pass through existing monitored items
1119    /// onto the monitored item itself.
1120    pub(crate) fn set_triggering(
1121        &mut self,
1122        monitored_item_id: u32,
1123        items_to_add: &[u32],
1124        items_to_remove: &[u32],
1125    ) -> Result<(Vec<StatusCode>, Vec<StatusCode>), StatusCode> {
1126        // Find monitored items that do or do not exist
1127        let (add_results, items_to_add) =
1128            self.validate_triggered_items(monitored_item_id, items_to_add);
1129        let (remove_results, items_to_remove) =
1130            self.validate_triggered_items(monitored_item_id, items_to_remove);
1131
1132        if let Some(ref mut monitored_item) = self.monitored_items.get_mut(&monitored_item_id) {
1133            // Set the triggering monitored items
1134            monitored_item.set_triggering(items_to_add.as_slice(), items_to_remove.as_slice());
1135
1136            Ok((add_results, remove_results))
1137        } else {
1138            // This monitored item is unrecognized
1139            Err(StatusCode::BadMonitoredItemIdInvalid)
1140        }
1141    }
1142}