use opcua_types::{
AttributeId, ByteString, DataTypeId, DateTime, DateTimeUtc, ExtensionObject, Guid, LocalizedText, NodeId,
NumericRange, ObjectId, ObjectTypeId, QualifiedName, service_types::TimeZoneDataType, TimestampsToReturn,
UAString, VariableTypeId, Variant,
};
use crate::address_space::{
AddressSpace,
object::ObjectBuilder,
relative_path::*,
variable::VariableBuilder,
};
pub trait Event {
type Err;
fn is_valid(&self) -> bool;
fn raise(&mut self, address_space: &mut AddressSpace) -> Result<NodeId, Self::Err>;
}
pub struct BaseEventType {
node_id: NodeId,
parent_node: NodeId,
browse_name: QualifiedName,
display_name: LocalizedText,
event_id: ByteString,
event_type: NodeId,
source_node: NodeId,
source_name: UAString,
time: DateTime,
receive_time: DateTime,
local_time: Option<TimeZoneDataType>,
message: LocalizedText,
severity: u16,
properties: Vec<(LocalizedText, Variant)>,
}
impl Event for BaseEventType {
type Err = ();
fn is_valid(&self) -> bool {
!self.node_id.is_null() &&
!self.event_id.is_null_or_empty() &&
!self.event_type.is_null() &&
self.severity >= 1 && self.severity <= 1000
}
fn raise(&mut self, address_space: &mut AddressSpace) -> Result<NodeId, Self::Err>
{
if self.is_valid() {
let ns = self.node_id.namespace;
let node_id = self.node_id.clone();
let object_builder = ObjectBuilder::new(&self.node_id, self.browse_name.clone(), self.display_name.clone())
.organized_by(self.parent_node.clone())
.has_type_definition(self.event_type.clone());
let object_builder = if !self.source_node.is_null() {
object_builder.has_event_source(self.source_node.clone())
} else {
object_builder
};
object_builder.insert(address_space);
self.add_property(&node_id, NodeId::next_numeric(ns), "EventId", "EventId", DataTypeId::ByteString, self.event_id.clone(), address_space);
self.add_property(&node_id, NodeId::next_numeric(ns), "EventType", "EventType", DataTypeId::NodeId, self.event_type.clone(), address_space);
self.add_property(&node_id, NodeId::next_numeric(ns), "SourceNode", "SourceNode", DataTypeId::NodeId, self.source_node.clone(), address_space);
self.add_property(&node_id, NodeId::next_numeric(ns), "SourceName", "SourceName", DataTypeId::String, self.source_name.clone(), address_space);
self.add_property(&node_id, NodeId::next_numeric(ns), "Time", "Time", DataTypeId::UtcTime, self.time.clone(), address_space);
self.add_property(&node_id, NodeId::next_numeric(ns), "ReceiveTime", "ReceiveTime", DataTypeId::UtcTime, self.receive_time.clone(), address_space);
self.add_property(&node_id, NodeId::next_numeric(ns), "Message", "Message", DataTypeId::LocalizedText, self.message.clone(), address_space);
self.add_property(&node_id, NodeId::next_numeric(ns), "Severity", "Severity", DataTypeId::UInt16, self.severity, address_space);
if let Some(ref local_time) = self.local_time {
let local_time = ExtensionObject::from_encodable(ObjectId::TimeZoneDataType_Encoding_DefaultBinary, local_time);
self.add_property(&node_id, NodeId::next_numeric(ns), "LocalTime", "LocalTime", DataTypeId::TimeZoneDataType, local_time, address_space);
}
Ok(node_id)
} else {
error!("Event is invalid and will not be inserted");
Err(())
}
}
}
impl BaseEventType {
pub fn new_now<R, E, S, T, U>(node_id: R, event_type_id: E, browse_name: S, display_name: T, parent_node: U) -> Self
where R: Into<NodeId>,
E: Into<NodeId>,
S: Into<QualifiedName>,
T: Into<LocalizedText>,
U: Into<NodeId>,
{
let now = DateTime::now();
Self::new(node_id, event_type_id, browse_name, display_name, parent_node, now)
}
pub fn new<R, E, S, T, U>(node_id: R, event_type_id: E, browse_name: S, display_name: T, parent_node: U, time: DateTime) -> Self
where R: Into<NodeId>,
E: Into<NodeId>,
S: Into<QualifiedName>,
T: Into<LocalizedText>,
U: Into<NodeId>,
{
Self {
node_id: node_id.into(),
browse_name: browse_name.into(),
display_name: display_name.into(),
parent_node: parent_node.into(),
event_id: Guid::new().into(),
event_type: event_type_id.into(),
source_node: NodeId::null(),
source_name: UAString::null(),
time: time.clone(),
receive_time: time,
local_time: None,
message: LocalizedText::null(),
severity: 1,
properties: Vec::with_capacity(20),
}
}
pub fn add_property<T, R, S, U, V>(&mut self, event_id: &NodeId, property_id: T, browse_name: R, display_name: S, data_type: U, value: V, address_space: &mut AddressSpace)
where T: Into<NodeId>,
R: Into<QualifiedName>,
S: Into<LocalizedText>,
U: Into<NodeId>,
V: Into<Variant>
{
let display_name = display_name.into();
let value = value.into();
self.properties.push((display_name.clone(), value.clone()));
Self::do_add_property(event_id, property_id, browse_name, display_name, data_type, value, address_space)
}
fn do_add_property<T, R, S, U, V>(event_id: &NodeId, property_id: T, browse_name: R, display_name: S, data_type: U, value: V, address_space: &mut AddressSpace)
where T: Into<NodeId>,
R: Into<QualifiedName>,
S: Into<LocalizedText>,
U: Into<NodeId>,
V: Into<Variant>
{
VariableBuilder::new(&property_id.into(), browse_name, display_name)
.property_of(event_id.clone())
.has_type_definition(VariableTypeId::PropertyType)
.data_type(data_type)
.value(value)
.insert(address_space);
}
pub fn message<T>(mut self, message: T) -> Self where T: Into<LocalizedText> {
self.message = message.into();
self
}
pub fn source_node<T>(mut self, source_node: T) -> Self where T: Into<NodeId> {
self.source_node = source_node.into();
self
}
pub fn source_name<T>(mut self, source_name: T) -> Self where T: Into<UAString> {
self.source_name = source_name.into();
self
}
pub fn local_time(mut self, local_time: Option<TimeZoneDataType>) -> Self {
self.local_time = local_time;
self
}
pub fn severity(mut self, severity: u16) -> Self {
self.severity = severity;
self
}
pub fn receive_time(mut self, receive_time: DateTime) -> Self {
self.receive_time = receive_time;
self
}
pub fn properties(&self) -> &Vec<(LocalizedText, Variant)> {
&self.properties
}
}
macro_rules! base_event_impl {
( $event:ident, $base:ident ) => {
impl $event {
pub fn add_property<T, R, S, U, V>(&mut self, event_id: &NodeId, property_id: T, browse_name: R, display_name: S, data_type: U, value: V, address_space: &mut AddressSpace)
where T: Into<NodeId>,
R: Into<QualifiedName>,
S: Into<LocalizedText>,
U: Into<NodeId>,
V: Into<Variant>
{
self.$base.add_property(event_id, property_id, browse_name, display_name, data_type, value, address_space);
}
pub fn message<T>(mut self, message: T) -> $event where T: Into<LocalizedText> {
self.$base = self.$base.message(message);
self
}
pub fn source_node<T>(mut self, source_node: T) -> $event where T: Into<NodeId> {
self.$base = self.$base.source_node(source_node);
self
}
pub fn source_name<T>(mut self, source_name: T) -> $event where T: Into<UAString> {
self.$base = self.$base.source_name(source_name);
self
}
pub fn local_time(mut self, local_time: Option<TimeZoneDataType>) -> $event {
self.$base = self.$base.local_time(local_time);
self
}
pub fn severity(mut self, severity: u16) -> $event {
self.$base = self.$base.severity(severity);
self
}
pub fn receive_time(mut self, receive_time: DateTime) -> $event {
self.$base = self.$base.receive_time(receive_time);
self
}
}
}
}
fn event_source_node(event_id: &NodeId, address_space: &AddressSpace) -> Option<NodeId> {
if let Ok(event_time_node) = find_node_from_browse_path(address_space, event_id, &["SourceNode".into()]) {
if let Some(value) = event_time_node.as_node().get_attribute(TimestampsToReturn::Neither, AttributeId::Value, NumericRange::None, &QualifiedName::null()) {
if let Some(value) = value.value {
match value {
Variant::NodeId(node_id) => Some(*node_id),
_ => None
}
} else {
None
}
} else {
None
}
} else {
None
}
}
fn event_time(event_id: &NodeId, address_space: &AddressSpace) -> Option<DateTime> {
if let Ok(event_time_node) = find_node_from_browse_path(address_space, event_id, &["Time".into()]) {
if let Some(value) = event_time_node.as_node().get_attribute(TimestampsToReturn::Neither, AttributeId::Value, NumericRange::None, &QualifiedName::null()) {
if let Some(value) = value.value {
match value {
Variant::DateTime(date_time) => Some(*date_time),
_ => None
}
} else {
None
}
} else {
None
}
} else {
None
}
}
pub fn filter_events<T, R, F>(source_object_id: T, event_type_id: R, address_space: &AddressSpace, time_predicate: F) -> Option<Vec<NodeId>>
where T: Into<NodeId>,
R: Into<NodeId>,
F: Fn(&DateTimeUtc) -> bool
{
let event_type_id = event_type_id.into();
let source_object_id = source_object_id.into();
if let Some(events) = address_space.find_objects_by_type(event_type_id, true) {
let event_ids = events.iter()
.filter(move |event_id| {
let mut filter = false;
if let Some(event_time) = event_time(event_id, address_space) {
if time_predicate(&event_time.as_chrono()) {
if let Some(source_node) = event_source_node(event_id, address_space) {
filter = source_node == source_object_id
}
}
}
filter
})
.cloned()
.collect::<Vec<NodeId>>();
if event_ids.is_empty() { None } else { Some(event_ids) }
} else {
None
}
}
pub fn purge_events<T, R>(source_object_id: T, event_type_id: R, address_space: &mut AddressSpace, happened_before: &DateTimeUtc) -> usize
where T: Into<NodeId>,
R: Into<NodeId>
{
if let Some(events) = filter_events(source_object_id, event_type_id, address_space, move |event_time| event_time < happened_before) {
info!("Deleting some events from the address space");
let len = events.len();
events.into_iter().for_each(|node_id| {
debug!("Deleting event {}", node_id);
address_space.delete(&node_id, true);
});
len
} else {
0
}
}
pub fn events_for_object<T>(source_object_id: T, address_space: &AddressSpace, happened_since: &DateTimeUtc) -> Option<Vec<NodeId>>
where T: Into<NodeId>
{
filter_events(source_object_id, ObjectTypeId::BaseEventType, address_space, move |event_time| event_time >= happened_since)
}
#[test]
fn test_event_source_node() {
let mut address_space = AddressSpace::new();
let ns = address_space.register_namespace("urn:test").unwrap();
let event_id = NodeId::next_numeric(ns);
let event_type_id = ObjectTypeId::BaseEventType;
let mut event = BaseEventType::new(&event_id, event_type_id, "Event1", "", NodeId::objects_folder_id(), DateTime::now())
.source_node(ObjectId::Server_ServerCapabilities);
assert!(event.raise(&mut address_space).is_ok());
assert_eq!(event_source_node(&event_id, &address_space).unwrap(), ObjectId::Server_ServerCapabilities.into());
}
#[test]
fn test_event_time() {
let mut address_space = AddressSpace::new();
let ns = address_space.register_namespace("urn:test").unwrap();
let event_id = NodeId::next_numeric(ns);
let event_type_id = ObjectTypeId::BaseEventType;
let mut event = BaseEventType::new(&event_id, event_type_id, "Event1", "", NodeId::objects_folder_id(), DateTime::now())
.source_node(ObjectId::Server_ServerCapabilities);
let expected_time = event.time.clone();
assert!(event.raise(&mut address_space).is_ok());
assert_eq!(event_time(&event_id, &address_space).unwrap(), expected_time);
}
#[test]
fn test_events_for_object() {
let mut address_space = AddressSpace::new();
let ns = address_space.register_namespace("urn:test").unwrap();
let happened_since = chrono::Utc::now();
let event_id = NodeId::next_numeric(ns);
let event_type_id = ObjectTypeId::BaseEventType;
let mut event = BaseEventType::new(&event_id, event_type_id, "Event1", "", NodeId::objects_folder_id(), DateTime::now())
.source_node(ObjectId::Server_ServerCapabilities);
assert!(event.raise(&mut address_space).is_ok());
let mut events = events_for_object(ObjectId::Server_ServerCapabilities, &address_space, &happened_since).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events.pop().unwrap(), event_id);
}
#[test]
fn test_purge_events() {
use opcua_console_logging;
use opcua_types::Identifier;
opcua_console_logging::init();
let mut address_space = AddressSpace::new();
let ns = address_space.register_namespace("urn:mynamespace").unwrap();
let first_node_id = match NodeId::next_numeric(ns).identifier {
Identifier::Numeric(i) => i + 1,
_ => panic!()
};
let source_node = ObjectId::Server_ServerCapabilities;
let start_time = DateTime::now().as_chrono();
let mut time = start_time.clone();
let mut last_purged_node_id = 0;
let event_type_id = ObjectTypeId::BaseEventType;
(0..10).for_each(|i| {
let event_id = NodeId::new(ns, format!("Event{}", i));
let event_name = format!("Event {}", i);
let mut event = BaseEventType::new(&event_id, event_type_id, event_name, "", NodeId::objects_folder_id(), DateTime::from(time))
.source_node(source_node);
assert!(event.raise(&mut address_space).is_ok());
if i == 4 {
last_purged_node_id = match NodeId::next_numeric(ns).identifier {
Identifier::Numeric(i) => i,
_ => panic!()
};
}
time = time + chrono::Duration::minutes(5);
});
let events = events_for_object(source_node, &address_space, &start_time).unwrap();
assert_eq!(events.len(), 10);
let happened_before = start_time + chrono::Duration::minutes(25);
assert_eq!(purge_events(source_node, ObjectTypeId::BaseEventType, &mut address_space, &happened_before), 5);
let events = events_for_object(source_node, &address_space, &start_time).unwrap();
assert_eq!(events.len(), 5);
let references = address_space.references();
(0..5).for_each(|i| {
let event_id = NodeId::new(ns, format!("Event{}", i));
assert!(!references.reference_to_node_exists(&event_id));
});
(5..10).for_each(|i| {
let event_id = NodeId::new(ns, format!("Event{}", i));
assert!(references.reference_to_node_exists(&event_id));
});
let source_node: NodeId = source_node.into();
debug!("Expecting to still find source node {}", source_node);
assert!(address_space.find_node(&source_node).is_some());
(first_node_id..last_purged_node_id).for_each(|i| {
let node_id = NodeId::new(ns, i);
assert!(address_space.find_node(&node_id).is_none());
assert!(!references.reference_to_node_exists(&node_id));
});
}