1use std::sync::Arc;
25
26use chrono::{DateTime, Utc};
27use serde::{Deserialize, Serialize};
28use tracing::{debug, trace};
29use uuid::Uuid;
30
31use crate::nodes::reference::ReferenceTypeId;
32use crate::nodes::{AddressSpace, BrowseDirection, QualifiedName};
33use crate::sdk::address_space::{BrowsePort, TypeHierarchyPort};
34use crate::sdk::subscription::EventSubscriptionPort;
35use crate::sdk::subscription::SubscriptionManager;
36use crate::types::{AttributeId, NodeId, Variant};
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct SimpleAttributeOperand {
48 pub type_definition_id: NodeId,
51 pub browse_path: Vec<QualifiedName>,
54 pub attribute_id: AttributeId,
56 pub index_range: String,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
64pub enum FilterOperand {
65 Literal(Variant),
67 SimpleAttribute(SimpleAttributeOperand),
69 Element(u32),
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct ContentFilterElement {
78 pub filter_operator: FilterOperator,
80 pub filter_operands: Vec<FilterOperand>,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
88#[repr(u32)]
89pub enum FilterOperator {
90 Equals = 0,
91 IsNull = 1,
92 GreaterThan = 2,
93 LessThan = 3,
94 GreaterThanOrEqual = 4,
95 LessThanOrEqual = 5,
96 Like = 6,
97 Not = 7,
98 Between = 8,
99 InList = 9,
100 And = 10,
101 Or = 11,
102 Cast = 12,
103 InView = 13,
104 OfType = 14,
105 RelatedTo = 15,
106 BitwiseAnd = 16,
107 BitwiseOr = 17,
108}
109
110impl FilterOperator {
111 pub fn from_u32(value: u32) -> Option<Self> {
113 match value {
114 0 => Some(Self::Equals),
115 1 => Some(Self::IsNull),
116 2 => Some(Self::GreaterThan),
117 3 => Some(Self::LessThan),
118 4 => Some(Self::GreaterThanOrEqual),
119 5 => Some(Self::LessThanOrEqual),
120 6 => Some(Self::Like),
121 7 => Some(Self::Not),
122 8 => Some(Self::Between),
123 9 => Some(Self::InList),
124 10 => Some(Self::And),
125 11 => Some(Self::Or),
126 12 => Some(Self::Cast),
127 13 => Some(Self::InView),
128 14 => Some(Self::OfType),
129 15 => Some(Self::RelatedTo),
130 16 => Some(Self::BitwiseAnd),
131 17 => Some(Self::BitwiseOr),
132 _ => None,
133 }
134 }
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct EventFilter {
142 pub select_clauses: Vec<SimpleAttributeOperand>,
145 pub where_clause: Vec<ContentFilterElement>,
148}
149
150impl EventFilter {
151 pub fn new() -> Self {
153 Self {
154 select_clauses: Vec::new(),
155 where_clause: Vec::new(),
156 }
157 }
158
159 pub fn base_event_type_filter() -> Self {
163 let base_type = NodeId::numeric(0, 2041); let field_names = [
166 "EventId",
167 "EventType",
168 "SourceNode",
169 "SourceName",
170 "Time",
171 "ReceiveTime",
172 "Message",
173 "Severity",
174 ];
175
176 let select_clauses = field_names
177 .iter()
178 .map(|name| SimpleAttributeOperand {
179 type_definition_id: base_type.clone(),
180 browse_path: vec![QualifiedName::new(0, *name)],
181 attribute_id: AttributeId::Value,
182 index_range: String::new(),
183 })
184 .collect();
185
186 Self {
187 select_clauses,
188 where_clause: Vec::new(),
189 }
190 }
191}
192
193impl Default for EventFilter {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct EventFieldList {
209 pub client_handle: u32,
211 pub event_fields: Vec<Variant>,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
217pub struct EventNotification {
218 pub subscription_id: u32,
220 pub field_list: EventFieldList,
222}
223
224#[derive(Debug, Clone)]
226pub struct EventData {
227 pub event_id: Vec<u8>,
229 pub event_type: NodeId,
231 pub source_node: NodeId,
233 pub source_name: String,
235 pub time: DateTime<Utc>,
237 pub receive_time: DateTime<Utc>,
239 pub message: String,
241 pub severity: u16,
243 pub custom_fields: Vec<(Vec<QualifiedName>, Variant)>,
245}
246
247impl EventData {
248 pub fn new(
250 event_type: NodeId,
251 source_node: NodeId,
252 source_name: impl Into<String>,
253 severity: u16,
254 message: impl Into<String>,
255 ) -> Self {
256 let event_id = Uuid::new_v4().as_bytes().to_vec();
257 let now = Utc::now();
258
259 Self {
260 event_id,
261 event_type,
262 source_node,
263 source_name: source_name.into(),
264 time: now,
265 receive_time: now,
266 message: message.into(),
267 severity,
268 custom_fields: Vec::new(),
269 }
270 }
271
272 pub fn with_field(mut self, browse_path: Vec<QualifiedName>, value: Variant) -> Self {
274 self.custom_fields.push((browse_path, value));
275 self
276 }
277
278 pub fn resolve_field(&self, operand: &SimpleAttributeOperand) -> Variant {
283 if operand.browse_path.is_empty() {
284 return Variant::Null;
285 }
286
287 if operand.browse_path.len() == 1 {
289 let field_name = &operand.browse_path[0].name;
290 match field_name.as_str() {
291 "EventId" => return Variant::ByteString(self.event_id.clone()),
292 "EventType" => return Variant::NodeId(self.event_type.clone()),
293 "SourceNode" => return Variant::NodeId(self.source_node.clone()),
294 "SourceName" => return Variant::String(self.source_name.clone()),
295 "Time" => return Variant::DateTime(self.time),
296 "ReceiveTime" => return Variant::DateTime(self.receive_time),
297 "Message" => return Variant::String(self.message.clone()),
298 "Severity" => return Variant::UInt16(self.severity),
299 _ => {}
300 }
301 }
302
303 for (path, value) in &self.custom_fields {
305 if paths_match(path, &operand.browse_path) {
306 return value.clone();
307 }
308 }
309
310 Variant::Null
312 }
313}
314
315fn paths_match(a: &[QualifiedName], b: &[QualifiedName]) -> bool {
317 if a.len() != b.len() {
318 return false;
319 }
320 a.iter()
321 .zip(b.iter())
322 .all(|(qa, qb)| qa.namespace_index == qb.namespace_index && qa.name == qb.name)
323}
324
325pub const BASE_EVENT_TYPE_ID: u32 = 2041;
331
332pub struct EventManager {
341 subscription_port: Arc<dyn EventSubscriptionPort>,
343 browse_port: Arc<dyn BrowsePort>,
345 type_hierarchy_port: Arc<dyn TypeHierarchyPort>,
347}
348
349impl EventManager {
350 pub fn new(
352 subscription_manager: Arc<SubscriptionManager>,
353 address_space: Arc<AddressSpace>,
354 ) -> Self {
355 Self {
356 subscription_port: subscription_manager,
357 browse_port: address_space.clone(),
358 type_hierarchy_port: address_space,
359 }
360 }
361
362 pub fn fire_event(
372 &self,
373 source_node: &NodeId,
374 event_type: &NodeId,
375 severity: u16,
376 message: &str,
377 ) {
378 let event_data = EventData::new(
379 event_type.clone(),
380 source_node.clone(),
381 source_node.to_string(),
382 severity,
383 message,
384 );
385
386 self.fire_event_with_data(&event_data);
387 }
388
389 pub fn fire_event_with_data(&self, event_data: &EventData) {
391 debug!(
392 source_node = %event_data.source_node,
393 event_type = %event_data.event_type,
394 severity = event_data.severity,
395 message = %event_data.message,
396 "Firing event"
397 );
398
399 let sub_ids = self.subscription_port.subscription_ids();
400
401 for sub_id in &sub_ids {
402 self.distribute_to_subscription(*sub_id, event_data);
403 }
404 }
405
406 fn distribute_to_subscription(&self, subscription_id: u32, event_data: &EventData) {
408 let event_items = self
410 .subscription_port
411 .get_event_monitored_items(subscription_id);
412
413 for (client_handle, filter) in &event_items {
414 if !self.evaluate_where_clause(&filter.where_clause, event_data) {
416 trace!(
417 subscription_id,
418 client_handle,
419 "Event filtered out by where clause"
420 );
421 continue;
422 }
423
424 let event_fields: Vec<Variant> = filter
426 .select_clauses
427 .iter()
428 .map(|clause| event_data.resolve_field(clause))
429 .collect();
430
431 let field_list = EventFieldList {
432 client_handle: *client_handle,
433 event_fields,
434 };
435
436 self.subscription_port
438 .push_event_notification(subscription_id, field_list);
439 }
440 }
441
442 fn evaluate_where_clause(
446 &self,
447 where_clause: &[ContentFilterElement],
448 event_data: &EventData,
449 ) -> bool {
450 if where_clause.is_empty() {
451 return true; }
453
454 self.evaluate_element(where_clause, 0, event_data)
456 }
457
458 fn evaluate_element(
460 &self,
461 elements: &[ContentFilterElement],
462 index: usize,
463 event_data: &EventData,
464 ) -> bool {
465 let Some(element) = elements.get(index) else {
466 return true; };
468
469 match element.filter_operator {
470 FilterOperator::OfType => {
471 if let Some(operand) = element.filter_operands.first() {
473 if let Some(type_id) = self.resolve_operand_as_node_id(operand, event_data) {
474 return self.is_subtype_of(&event_data.event_type, &type_id);
475 }
476 }
477 true }
479 FilterOperator::Equals => {
480 self.evaluate_comparison(elements, element, event_data, |a, b| a == b)
481 }
482 FilterOperator::GreaterThan => {
483 self.evaluate_comparison(elements, element, event_data, |a, b| {
484 compare_variants(a, b).map_or(false, |ord| ord == std::cmp::Ordering::Greater)
485 })
486 }
487 FilterOperator::LessThan => {
488 self.evaluate_comparison(elements, element, event_data, |a, b| {
489 compare_variants(a, b).map_or(false, |ord| ord == std::cmp::Ordering::Less)
490 })
491 }
492 FilterOperator::GreaterThanOrEqual => {
493 self.evaluate_comparison(elements, element, event_data, |a, b| {
494 compare_variants(a, b).map_or(false, |ord| ord != std::cmp::Ordering::Less)
495 })
496 }
497 FilterOperator::LessThanOrEqual => {
498 self.evaluate_comparison(elements, element, event_data, |a, b| {
499 compare_variants(a, b).map_or(false, |ord| ord != std::cmp::Ordering::Greater)
500 })
501 }
502 FilterOperator::And => {
503 element.filter_operands.iter().all(|op| {
505 if let FilterOperand::Element(idx) = op {
506 self.evaluate_element(elements, *idx as usize, event_data)
507 } else {
508 true
509 }
510 })
511 }
512 FilterOperator::Or => {
513 element.filter_operands.iter().any(|op| {
515 if let FilterOperand::Element(idx) = op {
516 self.evaluate_element(elements, *idx as usize, event_data)
517 } else {
518 false
519 }
520 })
521 }
522 FilterOperator::Not => {
523 if let Some(FilterOperand::Element(idx)) = element.filter_operands.first() {
525 !self.evaluate_element(elements, *idx as usize, event_data)
526 } else {
527 true
528 }
529 }
530 FilterOperator::IsNull => {
531 if let Some(operand) = element.filter_operands.first() {
532 let value = self.resolve_operand_as_variant(operand, event_data);
533 matches!(value, Variant::Null)
534 } else {
535 true
536 }
537 }
538 _ => true,
540 }
541 }
542
543 fn evaluate_comparison<F>(
545 &self,
546 _elements: &[ContentFilterElement],
547 element: &ContentFilterElement,
548 event_data: &EventData,
549 cmp_fn: F,
550 ) -> bool
551 where
552 F: Fn(&Variant, &Variant) -> bool,
553 {
554 if element.filter_operands.len() < 2 {
555 return true; }
557 let left = self.resolve_operand_as_variant(&element.filter_operands[0], event_data);
558 let right = self.resolve_operand_as_variant(&element.filter_operands[1], event_data);
559 cmp_fn(&left, &right)
560 }
561
562 fn resolve_operand_as_variant(
564 &self,
565 operand: &FilterOperand,
566 event_data: &EventData,
567 ) -> Variant {
568 match operand {
569 FilterOperand::Literal(v) => v.clone(),
570 FilterOperand::SimpleAttribute(attr) => event_data.resolve_field(attr),
571 FilterOperand::Element(_) => Variant::Null, }
573 }
574
575 fn resolve_operand_as_node_id(
577 &self,
578 operand: &FilterOperand,
579 event_data: &EventData,
580 ) -> Option<NodeId> {
581 match operand {
582 FilterOperand::Literal(Variant::NodeId(nid)) => Some(nid.clone()),
583 FilterOperand::SimpleAttribute(attr) => match event_data.resolve_field(attr) {
584 Variant::NodeId(nid) => Some(nid),
585 _ => None,
586 },
587 _ => None,
588 }
589 }
590
591 fn is_subtype_of(&self, event_type: &NodeId, target_type: &NodeId) -> bool {
593 if self
594 .type_hierarchy_port
595 .is_node_subtype_of(event_type, target_type)
596 {
597 return true;
598 }
599
600 self.browse_port
601 .get_references(event_type, BrowseDirection::Inverse)
602 .iter()
603 .any(|reference| {
604 reference.reference_type_id == ReferenceTypeId::HasSubtype
605 && reference.target_node_id == *target_type
606 })
607 }
608}
609
610fn compare_variants(a: &Variant, b: &Variant) -> Option<std::cmp::Ordering> {
612 match (a, b) {
613 (Variant::Double(a), Variant::Double(b)) => a.partial_cmp(b),
614 (Variant::Float(a), Variant::Float(b)) => a.partial_cmp(b),
615 (Variant::Int32(a), Variant::Int32(b)) => a.partial_cmp(b),
616 (Variant::UInt32(a), Variant::UInt32(b)) => a.partial_cmp(b),
617 (Variant::Int64(a), Variant::Int64(b)) => a.partial_cmp(b),
618 (Variant::UInt64(a), Variant::UInt64(b)) => a.partial_cmp(b),
619 (Variant::Int16(a), Variant::Int16(b)) => a.partial_cmp(b),
620 (Variant::UInt16(a), Variant::UInt16(b)) => a.partial_cmp(b),
621 (Variant::String(a), Variant::String(b)) => a.partial_cmp(b),
622 (Variant::DateTime(a), Variant::DateTime(b)) => a.partial_cmp(b),
623 (a, b) => {
625 let af = a.as_f64()?;
626 let bf = b.as_f64()?;
627 af.partial_cmp(&bf)
628 }
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use super::*;
635
636 #[test]
637 fn test_event_data_resolve_field() {
638 let event = EventData::new(
639 NodeId::numeric(0, 2041),
640 NodeId::numeric(2, 1001),
641 "TestSource",
642 500,
643 "Test event message",
644 );
645
646 let operand = SimpleAttributeOperand {
647 type_definition_id: NodeId::numeric(0, 2041),
648 browse_path: vec![QualifiedName::new(0, "Message")],
649 attribute_id: AttributeId::Value,
650 index_range: String::new(),
651 };
652
653 let result = event.resolve_field(&operand);
654 assert_eq!(result, Variant::String("Test event message".to_string()));
655 }
656
657 #[test]
658 fn test_event_data_resolve_severity() {
659 let event = EventData::new(
660 NodeId::numeric(0, 2041),
661 NodeId::numeric(2, 1001),
662 "TestSource",
663 750,
664 "High severity event",
665 );
666
667 let operand = SimpleAttributeOperand {
668 type_definition_id: NodeId::numeric(0, 2041),
669 browse_path: vec![QualifiedName::new(0, "Severity")],
670 attribute_id: AttributeId::Value,
671 index_range: String::new(),
672 };
673
674 let result = event.resolve_field(&operand);
675 assert_eq!(result, Variant::UInt16(750));
676 }
677
678 #[test]
679 fn test_event_data_unknown_field() {
680 let event = EventData::new(
681 NodeId::numeric(0, 2041),
682 NodeId::numeric(2, 1001),
683 "TestSource",
684 500,
685 "Test",
686 );
687
688 let operand = SimpleAttributeOperand {
689 type_definition_id: NodeId::numeric(0, 2041),
690 browse_path: vec![QualifiedName::new(0, "NonExistentField")],
691 attribute_id: AttributeId::Value,
692 index_range: String::new(),
693 };
694
695 let result = event.resolve_field(&operand);
696 assert_eq!(result, Variant::Null);
697 }
698
699 #[test]
700 fn test_event_data_custom_field() {
701 let event = EventData::new(
702 NodeId::numeric(0, 2041),
703 NodeId::numeric(2, 1001),
704 "TestSource",
705 500,
706 "Test",
707 )
708 .with_field(
709 vec![QualifiedName::new(2, "CustomField")],
710 Variant::Double(42.0),
711 );
712
713 let operand = SimpleAttributeOperand {
714 type_definition_id: NodeId::numeric(0, 2041),
715 browse_path: vec![QualifiedName::new(2, "CustomField")],
716 attribute_id: AttributeId::Value,
717 index_range: String::new(),
718 };
719
720 let result = event.resolve_field(&operand);
721 assert_eq!(result, Variant::Double(42.0));
722 }
723
724 #[test]
725 fn test_event_filter_base_type() {
726 let filter = EventFilter::base_event_type_filter();
727 assert_eq!(filter.select_clauses.len(), 8);
728 assert_eq!(filter.select_clauses[0].browse_path[0].name, "EventId");
729 assert_eq!(filter.select_clauses[7].browse_path[0].name, "Severity");
730 }
731
732 #[test]
733 fn test_filter_operator_from_u32() {
734 assert_eq!(FilterOperator::from_u32(0), Some(FilterOperator::Equals));
735 assert_eq!(FilterOperator::from_u32(14), Some(FilterOperator::OfType));
736 assert_eq!(FilterOperator::from_u32(99), None);
737 }
738
739 #[test]
740 fn test_compare_variants() {
741 assert_eq!(
742 compare_variants(&Variant::Double(1.0), &Variant::Double(2.0)),
743 Some(std::cmp::Ordering::Less)
744 );
745 assert_eq!(
746 compare_variants(&Variant::Int32(5), &Variant::Int32(5)),
747 Some(std::cmp::Ordering::Equal)
748 );
749 assert_eq!(
750 compare_variants(&Variant::String("b".into()), &Variant::String("a".into())),
751 Some(std::cmp::Ordering::Greater)
752 );
753 }
754
755 #[test]
756 fn test_paths_match() {
757 let a = vec![QualifiedName::new(0, "Foo"), QualifiedName::new(0, "Bar")];
758 let b = vec![QualifiedName::new(0, "Foo"), QualifiedName::new(0, "Bar")];
759 let c = vec![QualifiedName::new(0, "Foo")];
760 let d = vec![QualifiedName::new(1, "Foo"), QualifiedName::new(0, "Bar")];
761
762 assert!(paths_match(&a, &b));
763 assert!(!paths_match(&a, &c));
764 assert!(!paths_match(&a, &d));
765 }
766}