1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// OPCUA for Rust
// SPDX-License-Identifier: MPL-2.0
// Copyright (C) 2017-2022 Adam Lock

///! Helpers for NotificationMessage types
use crate::{
    date_time::DateTime,
    diagnostic_info::DiagnosticInfo,
    encoding::DecodingOptions,
    extension_object::ExtensionObject,
    node_id::Identifier,
    node_ids::ObjectId,
    service_types::{
        DataChangeNotification, EventFieldList, EventNotificationList, MonitoredItemNotification,
        NotificationMessage, StatusChangeNotification,
    },
    status_code::StatusCode,
};

impl NotificationMessage {
    /// Create a notification message which contains data change AND / OR events. Calling this with
    /// neither will panic. Notification data can have up to 2 elements to covers the case in
    /// table 158 where a subscription contains monitored items for events and data.
    pub fn data_change(
        sequence_number: u32,
        publish_time: DateTime,
        data_change_notifications: Vec<MonitoredItemNotification>,
        event_notifications: Vec<EventFieldList>,
    ) -> NotificationMessage {
        if data_change_notifications.is_empty() && event_notifications.is_empty() {
            panic!("No notifications supplied to data_change()");
        }

        let mut notification_data = Vec::with_capacity(2);
        if !data_change_notifications.is_empty() {
            let data_change_notification = DataChangeNotification {
                monitored_items: Some(data_change_notifications),
                diagnostic_infos: None,
            };
            trace!("data change notification = {:?}", data_change_notification);
            notification_data.push(ExtensionObject::from_encodable(
                ObjectId::DataChangeNotification_Encoding_DefaultBinary,
                &data_change_notification,
            ));
        }
        if !event_notifications.is_empty() {
            let event_notification_list = EventNotificationList {
                events: Some(event_notifications),
            };
            trace!("event notification = {:?}", event_notification_list);
            notification_data.push(ExtensionObject::from_encodable(
                ObjectId::EventNotificationList_Encoding_DefaultBinary,
                &event_notification_list,
            ));
        }

        // Both data and events are serialized
        NotificationMessage {
            sequence_number,
            publish_time,
            notification_data: Some(notification_data),
        }
    }
    /// Create a status change notification message
    pub fn status_change(
        sequence_number: u32,
        publish_time: DateTime,
        status: StatusCode,
    ) -> NotificationMessage {
        let status_change_notification = StatusChangeNotification {
            status,
            diagnostic_info: DiagnosticInfo::null(),
        };
        let notification_data = ExtensionObject::from_encodable(
            ObjectId::StatusChangeNotification_Encoding_DefaultBinary,
            &status_change_notification,
        );
        NotificationMessage {
            sequence_number,
            publish_time,
            notification_data: Some(vec![notification_data]),
        }
    }

    /// Create a keep-alive notification message
    pub fn keep_alive(sequence_number: u32, publish_time: DateTime) -> NotificationMessage {
        NotificationMessage {
            sequence_number,
            publish_time,
            notification_data: None,
        }
    }

    fn process_notification(
        n: &ExtensionObject,
        decoding_options: &DecodingOptions,
        data_changes: &mut Vec<DataChangeNotification>,
        events: &mut Vec<EventNotificationList>,
    ) {
        if n.node_id.namespace == 0 {
            if let Identifier::Numeric(id) = n.node_id.identifier {
                if id == ObjectId::DataChangeNotification_Encoding_DefaultBinary as u32 {
                    if let Ok(v) = n.decode_inner::<DataChangeNotification>(decoding_options) {
                        data_changes.push(v);
                    }
                } else if id == ObjectId::EventNotificationList_Encoding_DefaultBinary as u32 {
                    if let Ok(v) = n.decode_inner::<EventNotificationList>(decoding_options) {
                        events.push(v);
                    }
                } else if id == ObjectId::StatusChangeNotification_Encoding_DefaultBinary as u32 {
                    debug!("Ignoring a StatusChangeNotification");
                } else {
                    debug!("Ignoring a notification of type {:?}", n.node_id);
                }
            }
        }
    }

    /// Extract notifications from the message. Unrecognized / unparseable notifications will be
    /// ignored. If there are no notifications, the function will return `None`.
    pub fn notifications(
        &self,
        decoding_options: &DecodingOptions,
    ) -> Option<(Vec<DataChangeNotification>, Vec<EventNotificationList>)> {
        if let Some(ref notification_data) = self.notification_data {
            let mut data_changes = Vec::with_capacity(notification_data.len());
            let mut events = Vec::with_capacity(notification_data.len());

            // Build up the notifications
            notification_data.iter().for_each(|n| {
                Self::process_notification(n, decoding_options, &mut data_changes, &mut events);
            });
            if data_changes.is_empty() && events.is_empty() {
                None
            } else {
                Some((data_changes, events))
            }
        } else {
            None
        }
    }
}