opcua_types/
notification_message.rs1use crate::{
7 date_time::DateTime,
8 diagnostic_info::DiagnosticInfo,
9 encoding::DecodingOptions,
10 extension_object::ExtensionObject,
11 node_id::Identifier,
12 node_ids::ObjectId,
13 service_types::{
14 DataChangeNotification, EventFieldList, EventNotificationList, MonitoredItemNotification,
15 NotificationMessage, StatusChangeNotification,
16 },
17 status_code::StatusCode,
18};
19
20impl NotificationMessage {
21 pub fn data_change(
25 sequence_number: u32,
26 publish_time: DateTime,
27 data_change_notifications: Vec<MonitoredItemNotification>,
28 event_notifications: Vec<EventFieldList>,
29 ) -> NotificationMessage {
30 if data_change_notifications.is_empty() && event_notifications.is_empty() {
31 panic!("No notifications supplied to data_change()");
32 }
33
34 let mut notification_data = Vec::with_capacity(2);
35 if !data_change_notifications.is_empty() {
36 let data_change_notification = DataChangeNotification {
37 monitored_items: Some(data_change_notifications),
38 diagnostic_infos: None,
39 };
40 trace!("data change notification = {:?}", data_change_notification);
41 notification_data.push(ExtensionObject::from_encodable(
42 ObjectId::DataChangeNotification_Encoding_DefaultBinary,
43 &data_change_notification,
44 ));
45 }
46 if !event_notifications.is_empty() {
47 let event_notification_list = EventNotificationList {
48 events: Some(event_notifications),
49 };
50 trace!("event notification = {:?}", event_notification_list);
51 notification_data.push(ExtensionObject::from_encodable(
52 ObjectId::EventNotificationList_Encoding_DefaultBinary,
53 &event_notification_list,
54 ));
55 }
56
57 NotificationMessage {
59 sequence_number,
60 publish_time,
61 notification_data: Some(notification_data),
62 }
63 }
64 pub fn status_change(
66 sequence_number: u32,
67 publish_time: DateTime,
68 status: StatusCode,
69 ) -> NotificationMessage {
70 let status_change_notification = StatusChangeNotification {
71 status,
72 diagnostic_info: DiagnosticInfo::null(),
73 };
74 let notification_data = ExtensionObject::from_encodable(
75 ObjectId::StatusChangeNotification_Encoding_DefaultBinary,
76 &status_change_notification,
77 );
78 NotificationMessage {
79 sequence_number,
80 publish_time,
81 notification_data: Some(vec![notification_data]),
82 }
83 }
84
85 pub fn keep_alive(sequence_number: u32, publish_time: DateTime) -> NotificationMessage {
87 NotificationMessage {
88 sequence_number,
89 publish_time,
90 notification_data: None,
91 }
92 }
93
94 fn process_notification(
95 n: &ExtensionObject,
96 decoding_options: &DecodingOptions,
97 data_changes: &mut Vec<DataChangeNotification>,
98 events: &mut Vec<EventNotificationList>,
99 ) {
100 if n.node_id.namespace == 0 {
101 if let Identifier::Numeric(id) = n.node_id.identifier {
102 if id == ObjectId::DataChangeNotification_Encoding_DefaultBinary as u32 {
103 if let Ok(v) = n.decode_inner::<DataChangeNotification>(decoding_options) {
104 data_changes.push(v);
105 }
106 } else if id == ObjectId::EventNotificationList_Encoding_DefaultBinary as u32 {
107 if let Ok(v) = n.decode_inner::<EventNotificationList>(decoding_options) {
108 events.push(v);
109 }
110 } else if id == ObjectId::StatusChangeNotification_Encoding_DefaultBinary as u32 {
111 debug!("Ignoring a StatusChangeNotification");
112 } else {
113 debug!("Ignoring a notification of type {:?}", n.node_id);
114 }
115 }
116 }
117 }
118
119 pub fn notifications(
122 &self,
123 decoding_options: &DecodingOptions,
124 ) -> Option<(Vec<DataChangeNotification>, Vec<EventNotificationList>)> {
125 if let Some(ref notification_data) = self.notification_data {
126 let mut data_changes = Vec::with_capacity(notification_data.len());
127 let mut events = Vec::with_capacity(notification_data.len());
128
129 notification_data.iter().for_each(|n| {
131 Self::process_notification(n, decoding_options, &mut data_changes, &mut events);
132 });
133 if data_changes.is_empty() && events.is_empty() {
134 None
135 } else {
136 Some((data_changes, events))
137 }
138 } else {
139 None
140 }
141 }
142}