opcua_server/events/
event.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4
5//! Contains functions for generating events and adding them to the address space of the server.
6use opcua_types::{
7    service_types::TimeZoneDataType, AttributeId, ByteString, DataTypeId, DateTime, DateTimeUtc,
8    ExtensionObject, Guid, LocalizedText, NodeId, NumericRange, ObjectId, ObjectTypeId,
9    QualifiedName, TimestampsToReturn, UAString, VariableTypeId, Variant,
10};
11
12use crate::address_space::{
13    object::ObjectBuilder, relative_path::*, variable::VariableBuilder, AddressSpace,
14};
15
16/// Events can implement this to populate themselves into the address space
17pub trait Event {
18    type Err;
19
20    /// Tests if the event is valid
21    fn is_valid(&self) -> bool;
22
23    /// Raises the event, i.e. adds the object into the address space. The event must be valid to be inserted.
24    fn raise(&mut self, address_space: &mut AddressSpace) -> Result<NodeId, Self::Err>;
25}
26
27/// This corresponds to BaseEventType definition in OPC UA Part 5
28pub struct BaseEventType {
29    /// Node id
30    node_id: NodeId,
31    /// Parent node
32    parent_node: NodeId,
33    /// Browse name
34    browse_name: QualifiedName,
35    /// Display name
36    display_name: LocalizedText,
37    /// A unique identifier for an event, e.g. a GUID in a byte string
38    event_id: ByteString,
39    /// Event type describes the type of event
40    event_type: NodeId,
41    /// Source node identifies the node that the event originated from
42    /// or null.
43    source_node: NodeId,
44    /// Source name provides the description of the source of the event,
45    /// e.g. the display of the event source
46    source_name: UAString,
47    /// Time provides the time the event occurred. As close
48    /// to the event generator as possible.
49    time: DateTime,
50    /// Receive time provides the time the OPC UA server received
51    /// the event from the underlying device of another server.
52    receive_time: DateTime,
53    /// Local time (optional) is a structure containing
54    /// the offset and daylightsaving flag.
55    local_time: Option<TimeZoneDataType>,
56    /// Message provides a human readable localizable text description
57    /// of the event.
58    message: LocalizedText,
59    /// Severity is an indication of the urgency of the event. Values from 1 to 1000, with 1 as the lowest
60    /// severity and 1000 being the highest. A value of 1000 would indicate an event of catastrophic nature.
61    ///
62    /// Guidance:
63    ///
64    /// * 801-1000 - High
65    /// * 601-800 - Medium High
66    /// * 401-600 - Medium
67    /// * 201-400 - Medium Low
68    /// * 1-200 - Low
69    severity: u16,
70    /// Properties as string/values in the order they were added
71    properties: Vec<(LocalizedText, Variant)>,
72}
73
74impl Event for BaseEventType {
75    type Err = ();
76
77    fn is_valid(&self) -> bool {
78        !self.node_id.is_null()
79            && !self.event_id.is_null_or_empty()
80            && !self.event_type.is_null()
81            && self.severity >= 1
82            && self.severity <= 1000
83    }
84
85    fn raise(&mut self, address_space: &mut AddressSpace) -> Result<NodeId, Self::Err> {
86        if self.is_valid() {
87            // create an event object in a folder with the
88            let ns = self.node_id.namespace;
89            let node_id = self.node_id.clone();
90
91            let object_builder = ObjectBuilder::new(
92                &self.node_id,
93                self.browse_name.clone(),
94                self.display_name.clone(),
95            )
96            .organized_by(self.parent_node.clone())
97            .has_type_definition(self.event_type.clone());
98
99            let object_builder = if !self.source_node.is_null() {
100                object_builder.has_event_source(self.source_node.clone())
101            } else {
102                object_builder
103            };
104            object_builder.insert(address_space);
105
106            // Mandatory properties
107            self.add_property(
108                &node_id,
109                NodeId::next_numeric(ns),
110                "EventId",
111                "EventId",
112                DataTypeId::ByteString,
113                self.event_id.clone(),
114                address_space,
115            );
116            self.add_property(
117                &node_id,
118                NodeId::next_numeric(ns),
119                "EventType",
120                "EventType",
121                DataTypeId::NodeId,
122                self.event_type.clone(),
123                address_space,
124            );
125            self.add_property(
126                &node_id,
127                NodeId::next_numeric(ns),
128                "SourceNode",
129                "SourceNode",
130                DataTypeId::NodeId,
131                self.source_node.clone(),
132                address_space,
133            );
134            self.add_property(
135                &node_id,
136                NodeId::next_numeric(ns),
137                "SourceName",
138                "SourceName",
139                DataTypeId::String,
140                self.source_name.clone(),
141                address_space,
142            );
143            self.add_property(
144                &node_id,
145                NodeId::next_numeric(ns),
146                "Time",
147                "Time",
148                DataTypeId::UtcTime,
149                self.time,
150                address_space,
151            );
152            self.add_property(
153                &node_id,
154                NodeId::next_numeric(ns),
155                "ReceiveTime",
156                "ReceiveTime",
157                DataTypeId::UtcTime,
158                self.receive_time,
159                address_space,
160            );
161            self.add_property(
162                &node_id,
163                NodeId::next_numeric(ns),
164                "Message",
165                "Message",
166                DataTypeId::LocalizedText,
167                self.message.clone(),
168                address_space,
169            );
170            self.add_property(
171                &node_id,
172                NodeId::next_numeric(ns),
173                "Severity",
174                "Severity",
175                DataTypeId::UInt16,
176                self.severity,
177                address_space,
178            );
179
180            // LocalTime is optional
181            if let Some(ref local_time) = self.local_time {
182                // Serialise to extension object
183                let local_time = ExtensionObject::from_encodable(
184                    ObjectId::TimeZoneDataType_Encoding_DefaultBinary,
185                    local_time,
186                );
187                self.add_property(
188                    &node_id,
189                    NodeId::next_numeric(ns),
190                    "LocalTime",
191                    "LocalTime",
192                    DataTypeId::TimeZoneDataType,
193                    local_time,
194                    address_space,
195                );
196            }
197
198            Ok(node_id)
199        } else {
200            error!("Event is invalid and will not be inserted");
201            Err(())
202        }
203    }
204}
205
206impl BaseEventType {
207    pub fn new_now<R, E, S, T, U>(
208        node_id: R,
209        event_type_id: E,
210        browse_name: S,
211        display_name: T,
212        parent_node: U,
213    ) -> Self
214    where
215        R: Into<NodeId>,
216        E: Into<NodeId>,
217        S: Into<QualifiedName>,
218        T: Into<LocalizedText>,
219        U: Into<NodeId>,
220    {
221        let now = DateTime::now();
222        Self::new(
223            node_id,
224            event_type_id,
225            browse_name,
226            display_name,
227            parent_node,
228            now,
229        )
230    }
231
232    pub fn new<R, E, S, T, U>(
233        node_id: R,
234        event_type_id: E,
235        browse_name: S,
236        display_name: T,
237        parent_node: U,
238        time: DateTime,
239    ) -> Self
240    where
241        R: Into<NodeId>,
242        E: Into<NodeId>,
243        S: Into<QualifiedName>,
244        T: Into<LocalizedText>,
245        U: Into<NodeId>,
246    {
247        Self {
248            node_id: node_id.into(),
249            browse_name: browse_name.into(),
250            display_name: display_name.into(),
251            parent_node: parent_node.into(),
252            event_id: Guid::new().into(),
253            event_type: event_type_id.into(),
254            source_node: NodeId::null(),
255            source_name: UAString::null(),
256            time,
257            receive_time: time,
258            local_time: None,
259            message: LocalizedText::null(),
260            severity: 1,
261            properties: Vec::with_capacity(20),
262        }
263    }
264
265    /// Add a property to the event object
266    pub fn add_property<T, R, S, U, V>(
267        &mut self,
268        event_id: &NodeId,
269        property_id: T,
270        browse_name: R,
271        display_name: S,
272        data_type: U,
273        value: V,
274        address_space: &mut AddressSpace,
275    ) where
276        T: Into<NodeId>,
277        R: Into<QualifiedName>,
278        S: Into<LocalizedText>,
279        U: Into<NodeId>,
280        V: Into<Variant>,
281    {
282        let display_name = display_name.into();
283        let value = value.into();
284        self.properties.push((display_name.clone(), value.clone()));
285
286        Self::do_add_property(
287            event_id,
288            property_id,
289            browse_name,
290            display_name,
291            data_type,
292            value,
293            address_space,
294        )
295    }
296
297    /// Helper function inserts a property for the event
298    fn do_add_property<T, R, S, U, V>(
299        event_id: &NodeId,
300        property_id: T,
301        browse_name: R,
302        display_name: S,
303        data_type: U,
304        value: V,
305        address_space: &mut AddressSpace,
306    ) where
307        T: Into<NodeId>,
308        R: Into<QualifiedName>,
309        S: Into<LocalizedText>,
310        U: Into<NodeId>,
311        V: Into<Variant>,
312    {
313        VariableBuilder::new(&property_id.into(), browse_name, display_name)
314            .property_of(event_id.clone())
315            .has_type_definition(VariableTypeId::PropertyType)
316            .data_type(data_type)
317            .value(value)
318            .insert(address_space);
319    }
320
321    pub fn message<T>(mut self, message: T) -> Self
322    where
323        T: Into<LocalizedText>,
324    {
325        self.message = message.into();
326        self
327    }
328
329    pub fn source_node<T>(mut self, source_node: T) -> Self
330    where
331        T: Into<NodeId>,
332    {
333        self.source_node = source_node.into();
334        self
335    }
336
337    pub fn source_name<T>(mut self, source_name: T) -> Self
338    where
339        T: Into<UAString>,
340    {
341        self.source_name = source_name.into();
342        self
343    }
344
345    pub fn local_time(mut self, local_time: Option<TimeZoneDataType>) -> Self {
346        self.local_time = local_time;
347        self
348    }
349
350    pub fn severity(mut self, severity: u16) -> Self {
351        self.severity = severity;
352        self
353    }
354
355    pub fn receive_time(mut self, receive_time: DateTime) -> Self {
356        self.receive_time = receive_time;
357        self
358    }
359
360    pub fn properties(&self) -> &Vec<(LocalizedText, Variant)> {
361        &self.properties
362    }
363}
364
365/// This is a macro for types that aggregate from BaseEventType and want to expose the
366/// builder functions.
367macro_rules! base_event_impl {
368    ( $event:ident, $base:ident ) => {
369        impl $event {
370            pub fn add_property<T, R, S, U, V>(
371                &mut self,
372                event_id: &NodeId,
373                property_id: T,
374                browse_name: R,
375                display_name: S,
376                data_type: U,
377                value: V,
378                address_space: &mut AddressSpace,
379            ) where
380                T: Into<NodeId>,
381                R: Into<QualifiedName>,
382                S: Into<LocalizedText>,
383                U: Into<NodeId>,
384                V: Into<Variant>,
385            {
386                self.$base.add_property(
387                    event_id,
388                    property_id,
389                    browse_name,
390                    display_name,
391                    data_type,
392                    value,
393                    address_space,
394                );
395            }
396
397            pub fn message<T>(mut self, message: T) -> $event
398            where
399                T: Into<LocalizedText>,
400            {
401                self.$base = self.$base.message(message);
402                self
403            }
404
405            pub fn source_node<T>(mut self, source_node: T) -> $event
406            where
407                T: Into<NodeId>,
408            {
409                self.$base = self.$base.source_node(source_node);
410                self
411            }
412
413            pub fn source_name<T>(mut self, source_name: T) -> $event
414            where
415                T: Into<UAString>,
416            {
417                self.$base = self.$base.source_name(source_name);
418                self
419            }
420
421            pub fn local_time(mut self, local_time: Option<TimeZoneDataType>) -> $event {
422                self.$base = self.$base.local_time(local_time);
423                self
424            }
425
426            pub fn severity(mut self, severity: u16) -> $event {
427                self.$base = self.$base.severity(severity);
428                self
429            }
430
431            pub fn receive_time(mut self, receive_time: DateTime) -> $event {
432                self.$base = self.$base.receive_time(receive_time);
433                self
434            }
435        }
436    };
437}
438
439fn event_source_node(event_id: &NodeId, address_space: &AddressSpace) -> Option<NodeId> {
440    if let Ok(event_time_node) =
441        find_node_from_browse_path(address_space, event_id, &["SourceNode".into()])
442    {
443        if let Some(value) = event_time_node.as_node().get_attribute(
444            TimestampsToReturn::Neither,
445            AttributeId::Value,
446            NumericRange::None,
447            &QualifiedName::null(),
448        ) {
449            if let Some(value) = value.value {
450                match value {
451                    Variant::NodeId(node_id) => Some(*node_id),
452                    _ => None,
453                }
454            } else {
455                None
456            }
457        } else {
458            None
459        }
460    } else {
461        None
462    }
463}
464
465fn event_time(event_id: &NodeId, address_space: &AddressSpace) -> Option<DateTime> {
466    // Find the Time variable under the event to return a timestamp.
467    if let Ok(event_time_node) =
468        find_node_from_browse_path(address_space, event_id, &["Time".into()])
469    {
470        if let Some(value) = event_time_node.as_node().get_attribute(
471            TimestampsToReturn::Neither,
472            AttributeId::Value,
473            NumericRange::None,
474            &QualifiedName::null(),
475        ) {
476            if let Some(value) = value.value {
477                match value {
478                    Variant::DateTime(date_time) => Some(*date_time),
479                    _ => None,
480                }
481            } else {
482                None
483            }
484        } else {
485            None
486        }
487    } else {
488        None
489    }
490}
491
492/// Attempts to find events that were emitted by the source object based upon a time predicate
493pub fn filter_events<T, R, F>(
494    source_object_id: T,
495    event_type_id: R,
496    address_space: &AddressSpace,
497    time_predicate: F,
498) -> Option<Vec<NodeId>>
499where
500    T: Into<NodeId>,
501    R: Into<NodeId>,
502    F: Fn(&DateTimeUtc) -> bool,
503{
504    let event_type_id = event_type_id.into();
505    let source_object_id = source_object_id.into();
506    // Find events of type event_type_id
507    if let Some(events) = address_space.find_objects_by_type(event_type_id, true) {
508        let event_ids = events
509            .iter()
510            .filter(move |event_id| {
511                let mut filter = false;
512                if let Some(source_node) = event_source_node(event_id, address_space) {
513                    // Browse the relative path for the "Time" variable
514                    if let Some(event_time) = event_time(event_id, address_space) {
515                        // Filter on those happened since the time
516                        if time_predicate(&event_time.as_chrono()) {
517                            // Whose source node is source_object_id
518                            filter = source_node == source_object_id
519                        }
520                    }
521                }
522                filter
523            })
524            .cloned()
525            .collect::<Vec<NodeId>>();
526        if event_ids.is_empty() {
527            None
528        } else {
529            Some(event_ids)
530        }
531    } else {
532        None
533    }
534}
535
536pub fn purge_events<T, R>(
537    source_object_id: T,
538    event_type_id: R,
539    address_space: &mut AddressSpace,
540    happened_before: &DateTimeUtc,
541) -> usize
542where
543    T: Into<NodeId>,
544    R: Into<NodeId>,
545{
546    if let Some(events) = filter_events(
547        source_object_id,
548        event_type_id,
549        address_space,
550        move |event_time| event_time < happened_before,
551    ) {
552        // Delete these events from the address space
553        info!("Deleting some events from the address space");
554        let len = events.len();
555        events.into_iter().for_each(|node_id| {
556            debug!("Deleting event {}", node_id);
557            address_space.delete(&node_id, true);
558        });
559        len
560    } else {
561        0
562    }
563}
564
565/// Searches for events of the specified event type which reference the source object
566pub fn events_for_object<T>(
567    source_object_id: T,
568    address_space: &AddressSpace,
569    happened_since: &DateTimeUtc,
570) -> Option<Vec<NodeId>>
571where
572    T: Into<NodeId>,
573{
574    filter_events(
575        source_object_id,
576        ObjectTypeId::BaseEventType,
577        address_space,
578        move |event_time| event_time >= happened_since,
579    )
580}
581
582#[test]
583fn test_event_source_node() {
584    let mut address_space = AddressSpace::new();
585    let ns = address_space.register_namespace("urn:test").unwrap();
586    // Raise an event
587    let event_id = NodeId::next_numeric(ns);
588    let event_type_id = ObjectTypeId::BaseEventType;
589    let mut event = BaseEventType::new(
590        &event_id,
591        event_type_id,
592        "Event1",
593        "",
594        NodeId::objects_folder_id(),
595        DateTime::now(),
596    )
597    .source_node(ObjectId::Server_ServerCapabilities);
598    assert!(event.raise(&mut address_space).is_ok());
599    // Check that the helper fn returns the expected source node
600    assert_eq!(
601        event_source_node(&event_id, &address_space).unwrap(),
602        ObjectId::Server_ServerCapabilities.into()
603    );
604}
605
606#[test]
607fn test_event_time() {
608    let mut address_space = AddressSpace::new();
609    let ns = address_space.register_namespace("urn:test").unwrap();
610    // Raise an event
611    let event_id = NodeId::next_numeric(ns);
612    let event_type_id = ObjectTypeId::BaseEventType;
613    let mut event = BaseEventType::new(
614        &event_id,
615        event_type_id,
616        "Event1",
617        "",
618        NodeId::objects_folder_id(),
619        DateTime::now(),
620    )
621    .source_node(ObjectId::Server_ServerCapabilities);
622    let expected_time = event.time.clone();
623    assert!(event.raise(&mut address_space).is_ok());
624    // Check that the helper fn returns the expected source node
625    assert_eq!(
626        event_time(&event_id, &address_space).unwrap(),
627        expected_time
628    );
629}
630
631#[test]
632fn test_events_for_object() {
633    let mut address_space = AddressSpace::new();
634    let ns = address_space.register_namespace("urn:test").unwrap();
635
636    // Raise an event
637    let happened_since = chrono::Utc::now();
638    let event_id = NodeId::next_numeric(ns);
639    let event_type_id = ObjectTypeId::BaseEventType;
640    let mut event = BaseEventType::new(
641        &event_id,
642        event_type_id,
643        "Event1",
644        "",
645        NodeId::objects_folder_id(),
646        DateTime::now(),
647    )
648    .source_node(ObjectId::Server_ServerCapabilities);
649    assert!(event.raise(&mut address_space).is_ok());
650
651    // Check that event can be found
652    let mut events = events_for_object(
653        ObjectId::Server_ServerCapabilities,
654        &address_space,
655        &happened_since,
656    )
657    .unwrap();
658    assert_eq!(events.len(), 1);
659    assert_eq!(events.pop().unwrap(), event_id);
660}
661
662#[test]
663fn test_purge_events() {
664    use opcua_console_logging;
665    use opcua_types::Identifier;
666
667    opcua_console_logging::init();
668
669    let mut address_space = AddressSpace::new();
670
671    // Nodes will be created in this namespace
672    let ns = address_space.register_namespace("urn:mynamespace").unwrap();
673
674    // This test is going to raise a bunch of events and then purge some of them. The purged
675    // events should be the ones expected to be purged and there should be no trace of them
676    // in the address space after they are removed.
677
678    // Raising events will create bunch of numeric node ids for their properties. This
679    // call will find out the node id that the first node is most likely to have (note that if
680    // tests are run concurrently that use next_numeric() then they are not going to belong to this
681    // test but that does not matter.
682    let first_node_id = match NodeId::next_numeric(ns).identifier {
683        Identifier::Numeric(i) => i + 1,
684        _ => panic!(),
685    };
686
687    let source_node = ObjectId::Server_ServerCapabilities;
688
689    // Raise a bunch of events
690    let start_time = DateTime::now().as_chrono();
691    let mut time = start_time.clone();
692    let mut last_purged_node_id = 0;
693
694    let event_type_id = ObjectTypeId::BaseEventType;
695
696    (0..10).for_each(|i| {
697        let event_id = NodeId::new(ns, format!("Event{}", i));
698        let event_name = format!("Event {}", i);
699        let mut event = BaseEventType::new(
700            &event_id,
701            event_type_id,
702            event_name,
703            "",
704            NodeId::objects_folder_id(),
705            DateTime::from(time),
706        )
707        .source_node(source_node);
708        assert!(event.raise(&mut address_space).is_ok());
709
710        // The first 5 events will be purged, so note the last node id here because none of the
711        // ids between start and end should survive when tested.
712        if i == 4 {
713            last_purged_node_id = match NodeId::next_numeric(ns).identifier {
714                Identifier::Numeric(i) => i,
715                _ => panic!(),
716            };
717        }
718
719        time = time + chrono::Duration::minutes(5);
720    });
721
722    // Expect all events
723    let events = events_for_object(source_node, &address_space, &start_time).unwrap();
724    assert_eq!(events.len(), 10);
725
726    // Purge all events up to halfway
727    let happened_before = start_time + chrono::Duration::minutes(25);
728    assert_eq!(
729        purge_events(
730            source_node,
731            ObjectTypeId::BaseEventType,
732            &mut address_space,
733            &happened_before
734        ),
735        5
736    );
737
738    // Should have only 5 events left
739    let events = events_for_object(source_node, &address_space, &start_time).unwrap();
740    assert_eq!(events.len(), 5);
741
742    // There should be NO reference left to any of the events we purged in the address space
743    let references = address_space.references();
744    (0..5).for_each(|i| {
745        let event_id = NodeId::new(ns, format!("Event{}", i));
746        assert!(!references.reference_to_node_exists(&event_id));
747    });
748    (5..10).for_each(|i| {
749        let event_id = NodeId::new(ns, format!("Event{}", i));
750        assert!(references.reference_to_node_exists(&event_id));
751    });
752
753    // The node that generated the events should not be purged
754    // This was a bug during development
755    let source_node: NodeId = source_node.into();
756    debug!("Expecting to still find source node {}", source_node);
757    assert!(address_space.find_node(&source_node).is_some());
758
759    // All of properties that were created for purged nodes fall between first and last node id.
760    // None of the properties should exist now either - just scan over the range of numbers these
761    // nodes reside in.
762    (first_node_id..last_purged_node_id).for_each(|i| {
763        // Event properties were numerically assigned from the NS
764        let node_id = NodeId::new(ns, i);
765        assert!(address_space.find_node(&node_id).is_none());
766        assert!(!references.reference_to_node_exists(&node_id));
767    });
768}