1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use crate::{
date_time::DateTime,
diagnostic_info::DiagnosticInfo,
encoding::DecodingOptions,
extension_object::ExtensionObject,
node_id::Identifier,
node_ids::ObjectId,
service_types::{
DataChangeNotification, EventFieldList, EventNotificationList, MonitoredItemNotification,
NotificationMessage, StatusChangeNotification,
},
status_code::StatusCode,
};
impl NotificationMessage {
pub fn data_change(
sequence_number: u32,
publish_time: DateTime,
data_change_notifications: Vec<MonitoredItemNotification>,
event_notifications: Vec<EventFieldList>,
) -> NotificationMessage {
if data_change_notifications.is_empty() && event_notifications.is_empty() {
panic!("No notifications supplied to data_change()");
}
let mut notification_data = Vec::with_capacity(2);
if !data_change_notifications.is_empty() {
let data_change_notification = DataChangeNotification {
monitored_items: Some(data_change_notifications),
diagnostic_infos: None,
};
trace!("data change notification = {:?}", data_change_notification);
notification_data.push(ExtensionObject::from_encodable(
ObjectId::DataChangeNotification_Encoding_DefaultBinary,
&data_change_notification,
));
}
if !event_notifications.is_empty() {
let event_notification_list = EventNotificationList {
events: Some(event_notifications),
};
trace!("event notification = {:?}", event_notification_list);
notification_data.push(ExtensionObject::from_encodable(
ObjectId::EventNotificationList_Encoding_DefaultBinary,
&event_notification_list,
));
}
NotificationMessage {
sequence_number,
publish_time,
notification_data: Some(notification_data),
}
}
pub fn status_change(
sequence_number: u32,
publish_time: DateTime,
status: StatusCode,
) -> NotificationMessage {
let status_change_notification = StatusChangeNotification {
status,
diagnostic_info: DiagnosticInfo::null(),
};
let notification_data = ExtensionObject::from_encodable(
ObjectId::StatusChangeNotification_Encoding_DefaultBinary,
&status_change_notification,
);
NotificationMessage {
sequence_number,
publish_time,
notification_data: Some(vec![notification_data]),
}
}
pub fn keep_alive(sequence_number: u32, publish_time: DateTime) -> NotificationMessage {
NotificationMessage {
sequence_number,
publish_time,
notification_data: None,
}
}
fn process_notification(
n: &ExtensionObject,
decoding_options: &DecodingOptions,
data_changes: &mut Vec<DataChangeNotification>,
events: &mut Vec<EventNotificationList>,
) {
if n.node_id.namespace == 0 {
if let Identifier::Numeric(id) = n.node_id.identifier {
if id == ObjectId::DataChangeNotification_Encoding_DefaultBinary as u32 {
if let Ok(v) = n.decode_inner::<DataChangeNotification>(decoding_options) {
data_changes.push(v);
}
} else if id == ObjectId::EventNotificationList_Encoding_DefaultBinary as u32 {
if let Ok(v) = n.decode_inner::<EventNotificationList>(decoding_options) {
events.push(v);
}
} else if id == ObjectId::StatusChangeNotification_Encoding_DefaultBinary as u32 {
debug!("Ignoring a StatusChangeNotification");
} else {
debug!("Ignoring a notification of type {:?}", n.node_id);
}
}
}
}
pub fn notifications(
&self,
decoding_options: &DecodingOptions,
) -> Option<(Vec<DataChangeNotification>, Vec<EventNotificationList>)> {
if let Some(ref notification_data) = self.notification_data {
let mut data_changes = Vec::with_capacity(notification_data.len());
let mut events = Vec::with_capacity(notification_data.len());
notification_data.iter().for_each(|n| {
Self::process_notification(n, decoding_options, &mut data_changes, &mut events);
});
if data_changes.is_empty() && events.is_empty() {
None
} else {
Some((data_changes, events))
}
} else {
None
}
}
}