opcua_types/
notification_message.rs1use tracing::{debug, trace};
6
7use crate::{
8 date_time::DateTime, diagnostic_info::DiagnosticInfo, extension_object::ExtensionObject,
9 match_extension_object_owned, status_code::StatusCode, DataChangeNotification, EventFieldList,
10 EventNotificationList, MonitoredItemNotification, NotificationMessage,
11 StatusChangeNotification,
12};
13
14impl NotificationMessage {
15 pub fn data_change(
19 sequence_number: u32,
20 publish_time: DateTime,
21 data_change_notifications: Vec<MonitoredItemNotification>,
22 event_notifications: Vec<EventFieldList>,
23 ) -> NotificationMessage {
24 if data_change_notifications.is_empty() && event_notifications.is_empty() {
25 panic!("No notifications supplied to data_change()");
26 }
27
28 let mut notification_data = Vec::with_capacity(2);
29 if !data_change_notifications.is_empty() {
30 let data_change_notification = DataChangeNotification {
31 monitored_items: Some(data_change_notifications),
32 diagnostic_infos: None,
33 };
34 trace!("data change notification = {:?}", data_change_notification);
35 notification_data.push(ExtensionObject::from_message(data_change_notification));
36 }
37 if !event_notifications.is_empty() {
38 let event_notification_list = EventNotificationList {
39 events: Some(event_notifications),
40 };
41 trace!("event notification = {:?}", event_notification_list);
42 notification_data.push(ExtensionObject::from_message(event_notification_list));
43 }
44
45 NotificationMessage {
47 sequence_number,
48 publish_time,
49 notification_data: Some(notification_data),
50 }
51 }
52 pub fn status_change(
54 sequence_number: u32,
55 publish_time: DateTime,
56 status: StatusCode,
57 ) -> NotificationMessage {
58 let status_change_notification = StatusChangeNotification {
59 status,
60 diagnostic_info: DiagnosticInfo::null(),
61 };
62 let notification_data = ExtensionObject::from_message(status_change_notification);
63 NotificationMessage {
64 sequence_number,
65 publish_time,
66 notification_data: Some(vec![notification_data]),
67 }
68 }
69
70 pub fn keep_alive(sequence_number: u32, publish_time: DateTime) -> NotificationMessage {
72 NotificationMessage {
73 sequence_number,
74 publish_time,
75 notification_data: None,
76 }
77 }
78
79 fn process_notification(
80 n: ExtensionObject,
81 data_changes: &mut Vec<DataChangeNotification>,
82 events: &mut Vec<EventNotificationList>,
83 ) {
84 match_extension_object_owned!(n,
85 n: DataChangeNotification => data_changes.push(n),
86 n: EventNotificationList => events.push(n),
87 _ => {
88 if n.inner_is::<StatusChangeNotification>() {
89 debug!("Ignoring a StatusChangeNotification");
90 } else {
91 debug!("Ignoring a notification of type {:?}", n.binary_type_id());
92 }
93 }
94 )
95 }
96
97 pub fn into_notifications(
100 self,
101 ) -> Option<(Vec<DataChangeNotification>, Vec<EventNotificationList>)> {
102 if let Some(notification_data) = self.notification_data {
103 let mut data_changes = Vec::with_capacity(notification_data.len());
104 let mut events = Vec::with_capacity(notification_data.len());
105
106 notification_data.into_iter().for_each(|n| {
108 Self::process_notification(n, &mut data_changes, &mut events);
109 });
110 if data_changes.is_empty() && events.is_empty() {
111 None
112 } else {
113 Some((data_changes, events))
114 }
115 } else {
116 None
117 }
118 }
119}