1use opcua_types::{
7 service_types::TimeZoneDataType, AttributeId, ByteString, DataTypeId, DateTime, DateTimeUtc,
8 ExtensionObject, Guid, LocalizedText, NodeId, NumericRange, ObjectId, ObjectTypeId,
9 QualifiedName, TimestampsToReturn, UAString, VariableTypeId, Variant,
10};
11
12use crate::address_space::{
13 object::ObjectBuilder, relative_path::*, variable::VariableBuilder, AddressSpace,
14};
15
16pub trait Event {
18 type Err;
19
20 fn is_valid(&self) -> bool;
22
23 fn raise(&mut self, address_space: &mut AddressSpace) -> Result<NodeId, Self::Err>;
25}
26
27pub struct BaseEventType {
29 node_id: NodeId,
31 parent_node: NodeId,
33 browse_name: QualifiedName,
35 display_name: LocalizedText,
37 event_id: ByteString,
39 event_type: NodeId,
41 source_node: NodeId,
44 source_name: UAString,
47 time: DateTime,
50 receive_time: DateTime,
53 local_time: Option<TimeZoneDataType>,
56 message: LocalizedText,
59 severity: u16,
70 properties: Vec<(LocalizedText, Variant)>,
72}
73
74impl Event for BaseEventType {
75 type Err = ();
76
77 fn is_valid(&self) -> bool {
78 !self.node_id.is_null()
79 && !self.event_id.is_null_or_empty()
80 && !self.event_type.is_null()
81 && self.severity >= 1
82 && self.severity <= 1000
83 }
84
85 fn raise(&mut self, address_space: &mut AddressSpace) -> Result<NodeId, Self::Err> {
86 if self.is_valid() {
87 let ns = self.node_id.namespace;
89 let node_id = self.node_id.clone();
90
91 let object_builder = ObjectBuilder::new(
92 &self.node_id,
93 self.browse_name.clone(),
94 self.display_name.clone(),
95 )
96 .organized_by(self.parent_node.clone())
97 .has_type_definition(self.event_type.clone());
98
99 let object_builder = if !self.source_node.is_null() {
100 object_builder.has_event_source(self.source_node.clone())
101 } else {
102 object_builder
103 };
104 object_builder.insert(address_space);
105
106 self.add_property(
108 &node_id,
109 NodeId::next_numeric(ns),
110 "EventId",
111 "EventId",
112 DataTypeId::ByteString,
113 self.event_id.clone(),
114 address_space,
115 );
116 self.add_property(
117 &node_id,
118 NodeId::next_numeric(ns),
119 "EventType",
120 "EventType",
121 DataTypeId::NodeId,
122 self.event_type.clone(),
123 address_space,
124 );
125 self.add_property(
126 &node_id,
127 NodeId::next_numeric(ns),
128 "SourceNode",
129 "SourceNode",
130 DataTypeId::NodeId,
131 self.source_node.clone(),
132 address_space,
133 );
134 self.add_property(
135 &node_id,
136 NodeId::next_numeric(ns),
137 "SourceName",
138 "SourceName",
139 DataTypeId::String,
140 self.source_name.clone(),
141 address_space,
142 );
143 self.add_property(
144 &node_id,
145 NodeId::next_numeric(ns),
146 "Time",
147 "Time",
148 DataTypeId::UtcTime,
149 self.time,
150 address_space,
151 );
152 self.add_property(
153 &node_id,
154 NodeId::next_numeric(ns),
155 "ReceiveTime",
156 "ReceiveTime",
157 DataTypeId::UtcTime,
158 self.receive_time,
159 address_space,
160 );
161 self.add_property(
162 &node_id,
163 NodeId::next_numeric(ns),
164 "Message",
165 "Message",
166 DataTypeId::LocalizedText,
167 self.message.clone(),
168 address_space,
169 );
170 self.add_property(
171 &node_id,
172 NodeId::next_numeric(ns),
173 "Severity",
174 "Severity",
175 DataTypeId::UInt16,
176 self.severity,
177 address_space,
178 );
179
180 if let Some(ref local_time) = self.local_time {
182 let local_time = ExtensionObject::from_encodable(
184 ObjectId::TimeZoneDataType_Encoding_DefaultBinary,
185 local_time,
186 );
187 self.add_property(
188 &node_id,
189 NodeId::next_numeric(ns),
190 "LocalTime",
191 "LocalTime",
192 DataTypeId::TimeZoneDataType,
193 local_time,
194 address_space,
195 );
196 }
197
198 Ok(node_id)
199 } else {
200 error!("Event is invalid and will not be inserted");
201 Err(())
202 }
203 }
204}
205
206impl BaseEventType {
207 pub fn new_now<R, E, S, T, U>(
208 node_id: R,
209 event_type_id: E,
210 browse_name: S,
211 display_name: T,
212 parent_node: U,
213 ) -> Self
214 where
215 R: Into<NodeId>,
216 E: Into<NodeId>,
217 S: Into<QualifiedName>,
218 T: Into<LocalizedText>,
219 U: Into<NodeId>,
220 {
221 let now = DateTime::now();
222 Self::new(
223 node_id,
224 event_type_id,
225 browse_name,
226 display_name,
227 parent_node,
228 now,
229 )
230 }
231
232 pub fn new<R, E, S, T, U>(
233 node_id: R,
234 event_type_id: E,
235 browse_name: S,
236 display_name: T,
237 parent_node: U,
238 time: DateTime,
239 ) -> Self
240 where
241 R: Into<NodeId>,
242 E: Into<NodeId>,
243 S: Into<QualifiedName>,
244 T: Into<LocalizedText>,
245 U: Into<NodeId>,
246 {
247 Self {
248 node_id: node_id.into(),
249 browse_name: browse_name.into(),
250 display_name: display_name.into(),
251 parent_node: parent_node.into(),
252 event_id: Guid::new().into(),
253 event_type: event_type_id.into(),
254 source_node: NodeId::null(),
255 source_name: UAString::null(),
256 time,
257 receive_time: time,
258 local_time: None,
259 message: LocalizedText::null(),
260 severity: 1,
261 properties: Vec::with_capacity(20),
262 }
263 }
264
265 pub fn add_property<T, R, S, U, V>(
267 &mut self,
268 event_id: &NodeId,
269 property_id: T,
270 browse_name: R,
271 display_name: S,
272 data_type: U,
273 value: V,
274 address_space: &mut AddressSpace,
275 ) where
276 T: Into<NodeId>,
277 R: Into<QualifiedName>,
278 S: Into<LocalizedText>,
279 U: Into<NodeId>,
280 V: Into<Variant>,
281 {
282 let display_name = display_name.into();
283 let value = value.into();
284 self.properties.push((display_name.clone(), value.clone()));
285
286 Self::do_add_property(
287 event_id,
288 property_id,
289 browse_name,
290 display_name,
291 data_type,
292 value,
293 address_space,
294 )
295 }
296
297 fn do_add_property<T, R, S, U, V>(
299 event_id: &NodeId,
300 property_id: T,
301 browse_name: R,
302 display_name: S,
303 data_type: U,
304 value: V,
305 address_space: &mut AddressSpace,
306 ) where
307 T: Into<NodeId>,
308 R: Into<QualifiedName>,
309 S: Into<LocalizedText>,
310 U: Into<NodeId>,
311 V: Into<Variant>,
312 {
313 VariableBuilder::new(&property_id.into(), browse_name, display_name)
314 .property_of(event_id.clone())
315 .has_type_definition(VariableTypeId::PropertyType)
316 .data_type(data_type)
317 .value(value)
318 .insert(address_space);
319 }
320
321 pub fn message<T>(mut self, message: T) -> Self
322 where
323 T: Into<LocalizedText>,
324 {
325 self.message = message.into();
326 self
327 }
328
329 pub fn source_node<T>(mut self, source_node: T) -> Self
330 where
331 T: Into<NodeId>,
332 {
333 self.source_node = source_node.into();
334 self
335 }
336
337 pub fn source_name<T>(mut self, source_name: T) -> Self
338 where
339 T: Into<UAString>,
340 {
341 self.source_name = source_name.into();
342 self
343 }
344
345 pub fn local_time(mut self, local_time: Option<TimeZoneDataType>) -> Self {
346 self.local_time = local_time;
347 self
348 }
349
350 pub fn severity(mut self, severity: u16) -> Self {
351 self.severity = severity;
352 self
353 }
354
355 pub fn receive_time(mut self, receive_time: DateTime) -> Self {
356 self.receive_time = receive_time;
357 self
358 }
359
360 pub fn properties(&self) -> &Vec<(LocalizedText, Variant)> {
361 &self.properties
362 }
363}
364
365macro_rules! base_event_impl {
368 ( $event:ident, $base:ident ) => {
369 impl $event {
370 pub fn add_property<T, R, S, U, V>(
371 &mut self,
372 event_id: &NodeId,
373 property_id: T,
374 browse_name: R,
375 display_name: S,
376 data_type: U,
377 value: V,
378 address_space: &mut AddressSpace,
379 ) where
380 T: Into<NodeId>,
381 R: Into<QualifiedName>,
382 S: Into<LocalizedText>,
383 U: Into<NodeId>,
384 V: Into<Variant>,
385 {
386 self.$base.add_property(
387 event_id,
388 property_id,
389 browse_name,
390 display_name,
391 data_type,
392 value,
393 address_space,
394 );
395 }
396
397 pub fn message<T>(mut self, message: T) -> $event
398 where
399 T: Into<LocalizedText>,
400 {
401 self.$base = self.$base.message(message);
402 self
403 }
404
405 pub fn source_node<T>(mut self, source_node: T) -> $event
406 where
407 T: Into<NodeId>,
408 {
409 self.$base = self.$base.source_node(source_node);
410 self
411 }
412
413 pub fn source_name<T>(mut self, source_name: T) -> $event
414 where
415 T: Into<UAString>,
416 {
417 self.$base = self.$base.source_name(source_name);
418 self
419 }
420
421 pub fn local_time(mut self, local_time: Option<TimeZoneDataType>) -> $event {
422 self.$base = self.$base.local_time(local_time);
423 self
424 }
425
426 pub fn severity(mut self, severity: u16) -> $event {
427 self.$base = self.$base.severity(severity);
428 self
429 }
430
431 pub fn receive_time(mut self, receive_time: DateTime) -> $event {
432 self.$base = self.$base.receive_time(receive_time);
433 self
434 }
435 }
436 };
437}
438
439fn event_source_node(event_id: &NodeId, address_space: &AddressSpace) -> Option<NodeId> {
440 if let Ok(event_time_node) =
441 find_node_from_browse_path(address_space, event_id, &["SourceNode".into()])
442 {
443 if let Some(value) = event_time_node.as_node().get_attribute(
444 TimestampsToReturn::Neither,
445 AttributeId::Value,
446 NumericRange::None,
447 &QualifiedName::null(),
448 ) {
449 if let Some(value) = value.value {
450 match value {
451 Variant::NodeId(node_id) => Some(*node_id),
452 _ => None,
453 }
454 } else {
455 None
456 }
457 } else {
458 None
459 }
460 } else {
461 None
462 }
463}
464
465fn event_time(event_id: &NodeId, address_space: &AddressSpace) -> Option<DateTime> {
466 if let Ok(event_time_node) =
468 find_node_from_browse_path(address_space, event_id, &["Time".into()])
469 {
470 if let Some(value) = event_time_node.as_node().get_attribute(
471 TimestampsToReturn::Neither,
472 AttributeId::Value,
473 NumericRange::None,
474 &QualifiedName::null(),
475 ) {
476 if let Some(value) = value.value {
477 match value {
478 Variant::DateTime(date_time) => Some(*date_time),
479 _ => None,
480 }
481 } else {
482 None
483 }
484 } else {
485 None
486 }
487 } else {
488 None
489 }
490}
491
492pub fn filter_events<T, R, F>(
494 source_object_id: T,
495 event_type_id: R,
496 address_space: &AddressSpace,
497 time_predicate: F,
498) -> Option<Vec<NodeId>>
499where
500 T: Into<NodeId>,
501 R: Into<NodeId>,
502 F: Fn(&DateTimeUtc) -> bool,
503{
504 let event_type_id = event_type_id.into();
505 let source_object_id = source_object_id.into();
506 if let Some(events) = address_space.find_objects_by_type(event_type_id, true) {
508 let event_ids = events
509 .iter()
510 .filter(move |event_id| {
511 let mut filter = false;
512 if let Some(source_node) = event_source_node(event_id, address_space) {
513 if let Some(event_time) = event_time(event_id, address_space) {
515 if time_predicate(&event_time.as_chrono()) {
517 filter = source_node == source_object_id
519 }
520 }
521 }
522 filter
523 })
524 .cloned()
525 .collect::<Vec<NodeId>>();
526 if event_ids.is_empty() {
527 None
528 } else {
529 Some(event_ids)
530 }
531 } else {
532 None
533 }
534}
535
536pub fn purge_events<T, R>(
537 source_object_id: T,
538 event_type_id: R,
539 address_space: &mut AddressSpace,
540 happened_before: &DateTimeUtc,
541) -> usize
542where
543 T: Into<NodeId>,
544 R: Into<NodeId>,
545{
546 if let Some(events) = filter_events(
547 source_object_id,
548 event_type_id,
549 address_space,
550 move |event_time| event_time < happened_before,
551 ) {
552 info!("Deleting some events from the address space");
554 let len = events.len();
555 events.into_iter().for_each(|node_id| {
556 debug!("Deleting event {}", node_id);
557 address_space.delete(&node_id, true);
558 });
559 len
560 } else {
561 0
562 }
563}
564
565pub fn events_for_object<T>(
567 source_object_id: T,
568 address_space: &AddressSpace,
569 happened_since: &DateTimeUtc,
570) -> Option<Vec<NodeId>>
571where
572 T: Into<NodeId>,
573{
574 filter_events(
575 source_object_id,
576 ObjectTypeId::BaseEventType,
577 address_space,
578 move |event_time| event_time >= happened_since,
579 )
580}
581
582#[test]
583fn test_event_source_node() {
584 let mut address_space = AddressSpace::new();
585 let ns = address_space.register_namespace("urn:test").unwrap();
586 let event_id = NodeId::next_numeric(ns);
588 let event_type_id = ObjectTypeId::BaseEventType;
589 let mut event = BaseEventType::new(
590 &event_id,
591 event_type_id,
592 "Event1",
593 "",
594 NodeId::objects_folder_id(),
595 DateTime::now(),
596 )
597 .source_node(ObjectId::Server_ServerCapabilities);
598 assert!(event.raise(&mut address_space).is_ok());
599 assert_eq!(
601 event_source_node(&event_id, &address_space).unwrap(),
602 ObjectId::Server_ServerCapabilities.into()
603 );
604}
605
606#[test]
607fn test_event_time() {
608 let mut address_space = AddressSpace::new();
609 let ns = address_space.register_namespace("urn:test").unwrap();
610 let event_id = NodeId::next_numeric(ns);
612 let event_type_id = ObjectTypeId::BaseEventType;
613 let mut event = BaseEventType::new(
614 &event_id,
615 event_type_id,
616 "Event1",
617 "",
618 NodeId::objects_folder_id(),
619 DateTime::now(),
620 )
621 .source_node(ObjectId::Server_ServerCapabilities);
622 let expected_time = event.time.clone();
623 assert!(event.raise(&mut address_space).is_ok());
624 assert_eq!(
626 event_time(&event_id, &address_space).unwrap(),
627 expected_time
628 );
629}
630
631#[test]
632fn test_events_for_object() {
633 let mut address_space = AddressSpace::new();
634 let ns = address_space.register_namespace("urn:test").unwrap();
635
636 let happened_since = chrono::Utc::now();
638 let event_id = NodeId::next_numeric(ns);
639 let event_type_id = ObjectTypeId::BaseEventType;
640 let mut event = BaseEventType::new(
641 &event_id,
642 event_type_id,
643 "Event1",
644 "",
645 NodeId::objects_folder_id(),
646 DateTime::now(),
647 )
648 .source_node(ObjectId::Server_ServerCapabilities);
649 assert!(event.raise(&mut address_space).is_ok());
650
651 let mut events = events_for_object(
653 ObjectId::Server_ServerCapabilities,
654 &address_space,
655 &happened_since,
656 )
657 .unwrap();
658 assert_eq!(events.len(), 1);
659 assert_eq!(events.pop().unwrap(), event_id);
660}
661
662#[test]
663fn test_purge_events() {
664 use opcua_console_logging;
665 use opcua_types::Identifier;
666
667 opcua_console_logging::init();
668
669 let mut address_space = AddressSpace::new();
670
671 let ns = address_space.register_namespace("urn:mynamespace").unwrap();
673
674 let first_node_id = match NodeId::next_numeric(ns).identifier {
683 Identifier::Numeric(i) => i + 1,
684 _ => panic!(),
685 };
686
687 let source_node = ObjectId::Server_ServerCapabilities;
688
689 let start_time = DateTime::now().as_chrono();
691 let mut time = start_time.clone();
692 let mut last_purged_node_id = 0;
693
694 let event_type_id = ObjectTypeId::BaseEventType;
695
696 (0..10).for_each(|i| {
697 let event_id = NodeId::new(ns, format!("Event{}", i));
698 let event_name = format!("Event {}", i);
699 let mut event = BaseEventType::new(
700 &event_id,
701 event_type_id,
702 event_name,
703 "",
704 NodeId::objects_folder_id(),
705 DateTime::from(time),
706 )
707 .source_node(source_node);
708 assert!(event.raise(&mut address_space).is_ok());
709
710 if i == 4 {
713 last_purged_node_id = match NodeId::next_numeric(ns).identifier {
714 Identifier::Numeric(i) => i,
715 _ => panic!(),
716 };
717 }
718
719 time = time + chrono::Duration::minutes(5);
720 });
721
722 let events = events_for_object(source_node, &address_space, &start_time).unwrap();
724 assert_eq!(events.len(), 10);
725
726 let happened_before = start_time + chrono::Duration::minutes(25);
728 assert_eq!(
729 purge_events(
730 source_node,
731 ObjectTypeId::BaseEventType,
732 &mut address_space,
733 &happened_before
734 ),
735 5
736 );
737
738 let events = events_for_object(source_node, &address_space, &start_time).unwrap();
740 assert_eq!(events.len(), 5);
741
742 let references = address_space.references();
744 (0..5).for_each(|i| {
745 let event_id = NodeId::new(ns, format!("Event{}", i));
746 assert!(!references.reference_to_node_exists(&event_id));
747 });
748 (5..10).for_each(|i| {
749 let event_id = NodeId::new(ns, format!("Event{}", i));
750 assert!(references.reference_to_node_exists(&event_id));
751 });
752
753 let source_node: NodeId = source_node.into();
756 debug!("Expecting to still find source node {}", source_node);
757 assert!(address_space.find_node(&source_node).is_some());
758
759 (first_node_id..last_purged_node_id).for_each(|i| {
763 let node_id = NodeId::new(ns, i);
765 assert!(address_space.find_node(&node_id).is_none());
766 assert!(!references.reference_to_node_exists(&node_id));
767 });
768}