opcua_types/
notification_message.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4
5///! Helpers for NotificationMessage types
6use crate::{
7    date_time::DateTime,
8    diagnostic_info::DiagnosticInfo,
9    encoding::DecodingOptions,
10    extension_object::ExtensionObject,
11    node_id::Identifier,
12    node_ids::ObjectId,
13    service_types::{
14        DataChangeNotification, EventFieldList, EventNotificationList, MonitoredItemNotification,
15        NotificationMessage, StatusChangeNotification,
16    },
17    status_code::StatusCode,
18};
19
20impl NotificationMessage {
21    /// Create a notification message which contains data change AND / OR events. Calling this with
22    /// neither will panic. Notification data can have up to 2 elements to covers the case in
23    /// table 158 where a subscription contains monitored items for events and data.
24    pub fn data_change(
25        sequence_number: u32,
26        publish_time: DateTime,
27        data_change_notifications: Vec<MonitoredItemNotification>,
28        event_notifications: Vec<EventFieldList>,
29    ) -> NotificationMessage {
30        if data_change_notifications.is_empty() && event_notifications.is_empty() {
31            panic!("No notifications supplied to data_change()");
32        }
33
34        let mut notification_data = Vec::with_capacity(2);
35        if !data_change_notifications.is_empty() {
36            let data_change_notification = DataChangeNotification {
37                monitored_items: Some(data_change_notifications),
38                diagnostic_infos: None,
39            };
40            trace!("data change notification = {:?}", data_change_notification);
41            notification_data.push(ExtensionObject::from_encodable(
42                ObjectId::DataChangeNotification_Encoding_DefaultBinary,
43                &data_change_notification,
44            ));
45        }
46        if !event_notifications.is_empty() {
47            let event_notification_list = EventNotificationList {
48                events: Some(event_notifications),
49            };
50            trace!("event notification = {:?}", event_notification_list);
51            notification_data.push(ExtensionObject::from_encodable(
52                ObjectId::EventNotificationList_Encoding_DefaultBinary,
53                &event_notification_list,
54            ));
55        }
56
57        // Both data and events are serialized
58        NotificationMessage {
59            sequence_number,
60            publish_time,
61            notification_data: Some(notification_data),
62        }
63    }
64    /// Create a status change notification message
65    pub fn status_change(
66        sequence_number: u32,
67        publish_time: DateTime,
68        status: StatusCode,
69    ) -> NotificationMessage {
70        let status_change_notification = StatusChangeNotification {
71            status,
72            diagnostic_info: DiagnosticInfo::null(),
73        };
74        let notification_data = ExtensionObject::from_encodable(
75            ObjectId::StatusChangeNotification_Encoding_DefaultBinary,
76            &status_change_notification,
77        );
78        NotificationMessage {
79            sequence_number,
80            publish_time,
81            notification_data: Some(vec![notification_data]),
82        }
83    }
84
85    /// Create a keep-alive notification message
86    pub fn keep_alive(sequence_number: u32, publish_time: DateTime) -> NotificationMessage {
87        NotificationMessage {
88            sequence_number,
89            publish_time,
90            notification_data: None,
91        }
92    }
93
94    fn process_notification(
95        n: &ExtensionObject,
96        decoding_options: &DecodingOptions,
97        data_changes: &mut Vec<DataChangeNotification>,
98        events: &mut Vec<EventNotificationList>,
99    ) {
100        if n.node_id.namespace == 0 {
101            if let Identifier::Numeric(id) = n.node_id.identifier {
102                if id == ObjectId::DataChangeNotification_Encoding_DefaultBinary as u32 {
103                    if let Ok(v) = n.decode_inner::<DataChangeNotification>(decoding_options) {
104                        data_changes.push(v);
105                    }
106                } else if id == ObjectId::EventNotificationList_Encoding_DefaultBinary as u32 {
107                    if let Ok(v) = n.decode_inner::<EventNotificationList>(decoding_options) {
108                        events.push(v);
109                    }
110                } else if id == ObjectId::StatusChangeNotification_Encoding_DefaultBinary as u32 {
111                    debug!("Ignoring a StatusChangeNotification");
112                } else {
113                    debug!("Ignoring a notification of type {:?}", n.node_id);
114                }
115            }
116        }
117    }
118
119    /// Extract notifications from the message. Unrecognized / unparseable notifications will be
120    /// ignored. If there are no notifications, the function will return `None`.
121    pub fn notifications(
122        &self,
123        decoding_options: &DecodingOptions,
124    ) -> Option<(Vec<DataChangeNotification>, Vec<EventNotificationList>)> {
125        if let Some(ref notification_data) = self.notification_data {
126            let mut data_changes = Vec::with_capacity(notification_data.len());
127            let mut events = Vec::with_capacity(notification_data.len());
128
129            // Build up the notifications
130            notification_data.iter().for_each(|n| {
131                Self::process_notification(n, decoding_options, &mut data_changes, &mut events);
132            });
133            if data_changes.is_empty() && events.is_empty() {
134                None
135            } else {
136                Some((data_changes, events))
137            }
138        } else {
139            None
140        }
141    }
142}