use std::collections::{BTreeSet, VecDeque};
use std::result::Result;
use opcua_types::{
*,
node_ids::ObjectId,
service_types::{
DataChangeFilter, EventFieldList, EventFilter, MonitoredItemCreateRequest, MonitoredItemModifyRequest,
MonitoredItemNotification, ReadValueId, TimestampsToReturn,
},
status_code::StatusCode,
};
use crate::{
address_space::{
AddressSpace,
EventNotifier,
node::Node,
},
constants,
events::event_filter,
};
#[derive(Debug, Clone, PartialEq, Serialize)]
pub enum Notification {
MonitoredItemNotification(MonitoredItemNotification),
Event(EventFieldList),
}
impl From<MonitoredItemNotification> for Notification {
fn from(v: MonitoredItemNotification) -> Self {
Notification::MonitoredItemNotification(v)
}
}
impl From<EventFieldList> for Notification {
fn from(v: EventFieldList) -> Self {
Notification::Event(v)
}
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub(crate) enum FilterType {
None,
DataChangeFilter(DataChangeFilter),
EventFilter(EventFilter),
}
impl FilterType {
pub fn from_filter(filter: &ExtensionObject) -> Result<FilterType, StatusCode> {
let filter_type_id = &filter.node_id;
if filter_type_id.is_null() {
Ok(FilterType::None)
} else if let Ok(filter_type_id) = filter_type_id.as_object_id() {
match filter_type_id {
ObjectId::DataChangeFilter_Encoding_DefaultBinary => {
let decoding_limits = DecodingLimits::minimal();
Ok(FilterType::DataChangeFilter(filter.decode_inner::<DataChangeFilter>(&decoding_limits)?))
}
ObjectId::EventFilter_Encoding_DefaultBinary => {
let decoding_limits = DecodingLimits::default();
Ok(FilterType::EventFilter(filter.decode_inner::<EventFilter>(&decoding_limits)?))
}
_ => {
error!("Requested data filter type is not supported, {:?}", filter_type_id);
Err(StatusCode::BadFilterNotAllowed)
}
}
} else {
error!("Requested data filter type is not an object id, {:?}", filter_type_id);
Err(StatusCode::BadFilterNotAllowed)
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub(crate) struct MonitoredItem {
monitored_item_id: u32,
item_to_monitor: ReadValueId,
monitoring_mode: MonitoringMode,
triggered_items: BTreeSet<u32>,
client_handle: u32,
sampling_interval: Duration,
filter: FilterType,
discard_oldest: bool,
queue_size: usize,
notification_queue: VecDeque<Notification>,
queue_overflow: bool,
timestamps_to_return: TimestampsToReturn,
last_sample_time: DateTimeUtc,
last_data_value: Option<DataValue>,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum TickResult {
ReportValueChanged,
ValueChanged,
NoChange,
}
impl MonitoredItem {
pub fn new(now: &DateTimeUtc, monitored_item_id: u32, timestamps_to_return: TimestampsToReturn, request: &MonitoredItemCreateRequest) -> Result<MonitoredItem, StatusCode> {
let filter = FilterType::from_filter(&request.requested_parameters.filter)?;
let sampling_interval = Self::sanitize_sampling_interval(request.requested_parameters.sampling_interval);
let queue_size = Self::sanitize_queue_size(request.requested_parameters.queue_size as usize);
Ok(MonitoredItem {
monitored_item_id,
item_to_monitor: request.item_to_monitor.clone(),
monitoring_mode: request.monitoring_mode,
triggered_items: BTreeSet::new(),
client_handle: request.requested_parameters.client_handle,
sampling_interval,
filter,
discard_oldest: request.requested_parameters.discard_oldest,
timestamps_to_return,
last_sample_time: now.clone(),
last_data_value: None,
queue_size,
notification_queue: VecDeque::with_capacity(queue_size),
queue_overflow: false,
})
}
pub fn modify(&mut self, address_space: &AddressSpace, timestamps_to_return: TimestampsToReturn, request: &MonitoredItemModifyRequest) -> Result<ExtensionObject, StatusCode> {
self.timestamps_to_return = timestamps_to_return;
self.filter = FilterType::from_filter(&request.requested_parameters.filter)?;
self.sampling_interval = Self::sanitize_sampling_interval(request.requested_parameters.sampling_interval);
self.queue_size = Self::sanitize_queue_size(request.requested_parameters.queue_size as usize);
self.client_handle = request.requested_parameters.client_handle;
self.discard_oldest = request.requested_parameters.discard_oldest;
if self.notification_queue.len() > self.queue_size {
let discard = self.queue_size - self.notification_queue.len();
let _ = self.notification_queue.drain(0..discard);
self.notification_queue.shrink_to_fit();
} else if self.notification_queue.capacity() < self.queue_size {
let extra_capacity = self.queue_size - self.notification_queue.capacity();
self.notification_queue.reserve(extra_capacity);
}
self.validate_filter(address_space)
}
pub fn set_triggering(&mut self, items_to_add: &[u32], items_to_remove: &[u32]) {
items_to_remove.iter().for_each(|i| { self.triggered_items.remove(i); });
items_to_add.iter().for_each(|i| { self.triggered_items.insert(*i); });
}
pub fn validate_filter(&self, address_space: &AddressSpace) -> Result<ExtensionObject, StatusCode> {
let filter_result = if let FilterType::EventFilter(ref event_filter) = self.filter {
let filter_result = event_filter::validate(event_filter, address_space)?;
ExtensionObject::from_encodable(ObjectId::EventFilterResult_Encoding_DefaultBinary, &filter_result)
} else {
ExtensionObject::null()
};
Ok(filter_result)
}
pub fn tick(&mut self, now: &DateTimeUtc, address_space: &AddressSpace, publishing_interval_elapsed: bool, resend_data: bool) -> TickResult {
if self.monitoring_mode == MonitoringMode::Disabled {
TickResult::NoChange
} else {
let check_value = if resend_data {
true
} else if self.sampling_interval < 0f64 {
publishing_interval_elapsed
} else if self.sampling_interval == 0f64 {
true
} else {
let sampling_interval = super::duration_from_ms(self.sampling_interval);
let elapsed = now.signed_duration_since(self.last_sample_time);
elapsed >= sampling_interval
};
let value_changed = check_value && {
let first_tick = !self.is_event_filter() && self.last_data_value.is_none();
let value_changed = self.check_value(address_space, now, resend_data);
first_tick || value_changed || !self.notification_queue.is_empty()
};
if value_changed {
if self.monitoring_mode == MonitoringMode::Reporting {
TickResult::ReportValueChanged
} else {
TickResult::ValueChanged
}
} else {
TickResult::NoChange
}
}
}
fn get_event_notifier(node: &dyn Node) -> EventNotifier {
if let Some(v) = node.get_attribute(TimestampsToReturn::Neither, AttributeId::EventNotifier, NumericRange::None, &QualifiedName::null()) {
if let Variant::Byte(v) = v.value.unwrap_or(0u8.into()) {
EventNotifier::from_bits_truncate(v)
} else {
EventNotifier::empty()
}
} else {
EventNotifier::empty()
}
}
fn check_for_events(&mut self, address_space: &AddressSpace, happened_since: &DateTimeUtc, node: &dyn Node) -> bool {
match self.filter {
FilterType::EventFilter(ref filter) => {
if Self::get_event_notifier(node).contains(EventNotifier::SUBSCRIBE_TO_EVENTS) {
let object_id = node.node_id();
if let Some(events) = event_filter::evaluate(&object_id, filter, address_space, &happened_since, self.client_handle) {
events.into_iter().for_each(|event| self.enqueue_notification_message(event));
true
} else {
false
}
} else {
false
}
}
_ => panic!()
}
}
fn check_for_data_change(&mut self, _address_space: &AddressSpace, resend_data: bool, attribute_id: AttributeId, node: &dyn Node) -> bool {
let data_value = node.get_attribute(TimestampsToReturn::Neither, attribute_id, NumericRange::None, &QualifiedName::null());
if let Some(mut data_value) = data_value {
let data_change = if resend_data {
true
} else if let Some(ref last_data_value) = self.last_data_value {
match self.filter {
FilterType::None => {
data_value.value != last_data_value.value
}
FilterType::DataChangeFilter(ref filter) => {
!filter.compare(&data_value, last_data_value, None)
}
_ => {
false
}
}
} else {
trace!("No last data value so item has changed, node {:?}", self.item_to_monitor.node_id);
true
};
if data_change {
trace!("Data change on item -, node {:?}, data_value = {:?}", self.item_to_monitor.node_id, data_value);
self.last_data_value = Some(data_value.clone());
match self.timestamps_to_return {
TimestampsToReturn::Neither | TimestampsToReturn::Invalid => {
data_value.source_timestamp = None;
data_value.source_picoseconds = None;
data_value.server_timestamp = None;
data_value.server_picoseconds = None
}
TimestampsToReturn::Server => {
data_value.source_timestamp = None;
data_value.source_picoseconds = None;
}
TimestampsToReturn::Source => {
data_value.server_timestamp = None;
data_value.server_picoseconds = None
}
TimestampsToReturn::Both => {
}
}
let client_handle = self.client_handle;
self.enqueue_notification_message(MonitoredItemNotification {
client_handle,
value: data_value,
});
trace!("Monitored item state = {:?}", self);
} else {
trace!("No data change on item, node {:?}", self.item_to_monitor.node_id);
}
data_change
} else {
false
}
}
fn is_event_filter(&self) -> bool {
match self.filter {
FilterType::EventFilter(_) => true,
_ => false
}
}
pub fn check_value(&mut self, address_space: &AddressSpace, now: &DateTimeUtc, resend_data: bool) -> bool {
if self.monitoring_mode == MonitoringMode::Disabled {
panic!("Should not check value while monitoring mode is disabled");
}
let changed = if let Some(node) = address_space.find_node(&self.item_to_monitor.node_id) {
match AttributeId::from_u32(self.item_to_monitor.attribute_id) {
Ok(attribute_id) => {
let node = node.as_node();
match self.filter {
FilterType::EventFilter(_) => {
if attribute_id == AttributeId::EventNotifier {
let happened_since = self.last_sample_time.clone();
self.check_for_events(address_space, &happened_since, node)
} else {
false
}
}
_ => {
self.check_for_data_change(address_space, resend_data, attribute_id, node)
}
}
}
Err(_) => {
trace!("Item has no attribute_id {} so it hasn't changed, node {:?}", self.item_to_monitor.attribute_id, self.item_to_monitor.node_id);
false
}
}
} else {
trace!("Cannot find item to monitor, node {:?}", self.item_to_monitor.node_id);
false
};
self.last_sample_time = *now;
changed
}
pub fn enqueue_notification_message<T>(&mut self, notification: T) where T: Into<Notification> {
let overflow = if self.notification_queue.len() == self.queue_size {
trace!("Data change overflow, node {:?}", self.item_to_monitor.node_id);
if self.discard_oldest {
let _ = self.notification_queue.pop_front();
} else {
self.notification_queue.pop_back();
}
self.queue_size > 1
} else {
false
};
let mut notification = notification.into();
if overflow {
if let Notification::MonitoredItemNotification(ref mut notification) = notification {
notification.value.status = Some(notification.value.status() | StatusCode::OVERFLOW);
}
self.queue_overflow = true;
}
self.notification_queue.push_back(notification);
}
#[cfg(test)]
pub fn oldest_notification_message(&mut self) -> Option<Notification> {
if self.notification_queue.is_empty() {
None
} else {
self.queue_overflow = false;
self.notification_queue.pop_front()
}
}
pub fn all_notifications(&mut self) -> Option<Vec<Notification>> {
if self.notification_queue.is_empty() {
None
} else {
self.queue_overflow = false;
Some(self.notification_queue.drain(..).collect())
}
}
fn sanitize_sampling_interval(requested_sampling_interval: f64) -> f64 {
if requested_sampling_interval < 0.0 {
-1.0
} else if requested_sampling_interval == 0.0 || requested_sampling_interval < constants::MIN_SAMPLING_INTERVAL {
constants::MIN_SAMPLING_INTERVAL
} else {
requested_sampling_interval
}
}
fn sanitize_queue_size(requested_queue_size: usize) -> usize {
if requested_queue_size == 0 {
1
} else if requested_queue_size == 1 {
1
} else if requested_queue_size > constants::MAX_DATA_CHANGE_QUEUE_SIZE {
constants::MAX_DATA_CHANGE_QUEUE_SIZE
} else {
requested_queue_size
}
}
pub fn monitored_item_id(&self) -> u32 {
self.monitored_item_id
}
pub fn client_handle(&self) -> u32 {
self.client_handle
}
pub fn sampling_interval(&self) -> Duration {
self.sampling_interval
}
pub fn triggered_items(&self) -> &BTreeSet<u32> {
&self.triggered_items
}
pub fn set_monitoring_mode(&mut self, monitoring_mode: MonitoringMode) {
self.monitoring_mode = monitoring_mode;
}
pub fn monitoring_mode(&self) -> MonitoringMode {
self.monitoring_mode
}
pub fn queue_size(&self) -> usize {
self.queue_size
}
#[cfg(test)]
pub fn queue_overflow(&self) -> bool {
self.queue_overflow
}
#[cfg(test)]
pub fn notification_queue(&self) -> &VecDeque<Notification> {
&self.notification_queue
}
#[cfg(test)]
pub(crate) fn set_discard_oldest(&mut self, discard_oldest: bool) {
self.discard_oldest = discard_oldest;
}
}