opcua_types/
notification_message.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2024 Adam Lock
4//! Helpers for NotificationMessage types
5use tracing::{debug, trace};
6
7use crate::{
8    date_time::DateTime, diagnostic_info::DiagnosticInfo, extension_object::ExtensionObject,
9    match_extension_object_owned, status_code::StatusCode, DataChangeNotification, EventFieldList,
10    EventNotificationList, MonitoredItemNotification, NotificationMessage,
11    StatusChangeNotification,
12};
13
14impl NotificationMessage {
15    /// Create a notification message which contains data change AND / OR events. Calling this with
16    /// neither will panic. Notification data can have up to 2 elements to covers the case in
17    /// table 158 where a subscription contains monitored items for events and data.
18    pub fn data_change(
19        sequence_number: u32,
20        publish_time: DateTime,
21        data_change_notifications: Vec<MonitoredItemNotification>,
22        event_notifications: Vec<EventFieldList>,
23    ) -> NotificationMessage {
24        if data_change_notifications.is_empty() && event_notifications.is_empty() {
25            panic!("No notifications supplied to data_change()");
26        }
27
28        let mut notification_data = Vec::with_capacity(2);
29        if !data_change_notifications.is_empty() {
30            let data_change_notification = DataChangeNotification {
31                monitored_items: Some(data_change_notifications),
32                diagnostic_infos: None,
33            };
34            trace!("data change notification = {:?}", data_change_notification);
35            notification_data.push(ExtensionObject::from_message(data_change_notification));
36        }
37        if !event_notifications.is_empty() {
38            let event_notification_list = EventNotificationList {
39                events: Some(event_notifications),
40            };
41            trace!("event notification = {:?}", event_notification_list);
42            notification_data.push(ExtensionObject::from_message(event_notification_list));
43        }
44
45        // Both data and events are serialized
46        NotificationMessage {
47            sequence_number,
48            publish_time,
49            notification_data: Some(notification_data),
50        }
51    }
52    /// Create a status change notification message
53    pub fn status_change(
54        sequence_number: u32,
55        publish_time: DateTime,
56        status: StatusCode,
57    ) -> NotificationMessage {
58        let status_change_notification = StatusChangeNotification {
59            status,
60            diagnostic_info: DiagnosticInfo::null(),
61        };
62        let notification_data = ExtensionObject::from_message(status_change_notification);
63        NotificationMessage {
64            sequence_number,
65            publish_time,
66            notification_data: Some(vec![notification_data]),
67        }
68    }
69
70    /// Create a keep-alive notification message
71    pub fn keep_alive(sequence_number: u32, publish_time: DateTime) -> NotificationMessage {
72        NotificationMessage {
73            sequence_number,
74            publish_time,
75            notification_data: None,
76        }
77    }
78
79    fn process_notification(
80        n: ExtensionObject,
81        data_changes: &mut Vec<DataChangeNotification>,
82        events: &mut Vec<EventNotificationList>,
83    ) {
84        match_extension_object_owned!(n,
85            n: DataChangeNotification => data_changes.push(n),
86            n: EventNotificationList => events.push(n),
87            _ => {
88                if n.inner_is::<StatusChangeNotification>() {
89                    debug!("Ignoring a StatusChangeNotification");
90                } else {
91                    debug!("Ignoring a notification of type {:?}", n.binary_type_id());
92                }
93            }
94        )
95    }
96
97    /// Extract notifications from the message. Unrecognized / unparseable notifications will be
98    /// ignored. If there are no notifications, the function will return `None`.
99    pub fn into_notifications(
100        self,
101    ) -> Option<(Vec<DataChangeNotification>, Vec<EventNotificationList>)> {
102        if let Some(notification_data) = self.notification_data {
103            let mut data_changes = Vec::with_capacity(notification_data.len());
104            let mut events = Vec::with_capacity(notification_data.len());
105
106            // Build up the notifications
107            notification_data.into_iter().for_each(|n| {
108                Self::process_notification(n, &mut data_changes, &mut events);
109            });
110            if data_changes.is_empty() && events.is_empty() {
111                None
112            } else {
113                Some((data_changes, events))
114            }
115        } else {
116            None
117        }
118    }
119}