1use std::collections::{BTreeSet, VecDeque};
2
3use chrono::TimeDelta;
4use opcua_nodes::{Event, ParsedEventFilter, TypeTree};
5use tracing::{error, warn};
6
7use super::MonitoredItemHandle;
8use crate::{info::ServerInfo, node_manager::ParsedReadValueId};
9use opcua_types::{
10 match_extension_object_owned, DataChangeFilter, DataValue, DateTime, EventFieldList,
11 EventFilter, EventFilterResult, ExtensionObject, MonitoredItemCreateRequest,
12 MonitoredItemModifyRequest, MonitoredItemNotification, MonitoringMode, NumericRange,
13 ParsedDataChangeFilter, StatusCode, TimestampsToReturn, Variant,
14};
15
16#[derive(Debug, Clone, PartialEq)]
17pub(crate) enum Notification {
18 MonitoredItemNotification(MonitoredItemNotification),
19 Event(EventFieldList),
20}
21
22impl From<MonitoredItemNotification> for Notification {
23 fn from(v: MonitoredItemNotification) -> Self {
24 Notification::MonitoredItemNotification(v)
25 }
26}
27
28impl From<EventFieldList> for Notification {
29 fn from(v: EventFieldList) -> Self {
30 Notification::Event(v)
31 }
32}
33
34fn parse_sampling_interval(int: f64) -> SamplingInterval {
35 match int {
36 ..0.0 => SamplingInterval::Subscription,
37 0.0 => SamplingInterval::Zero,
38 _ => {
39 if !int.is_finite() {
40 warn!("Invalid sampling interval {}, using maximum", int);
41 return SamplingInterval::NonZero(TimeDelta::MAX);
42 }
43 let raw = int * 1000.0;
44 SamplingInterval::NonZero(if raw > i64::MAX as f64 {
45 TimeDelta::MAX
46 } else {
47 TimeDelta::microseconds(raw as i64)
48 })
49 }
50 }
51}
52
53#[derive(Debug, Clone)]
54pub enum FilterType {
56 None,
57 DataChangeFilter(ParsedDataChangeFilter),
58 EventFilter(ParsedEventFilter),
59}
60
61impl FilterType {
62 pub fn from_filter(
65 filter: ExtensionObject,
66 eu_range: Option<(f64, f64)>,
67 type_tree: &dyn TypeTree,
68 ) -> (Option<EventFilterResult>, Result<FilterType, StatusCode>) {
69 if filter.is_null() {
71 return (None, Ok(FilterType::None));
72 }
73
74 match_extension_object_owned!(filter,
75 v: DataChangeFilter => {
76 let res = ParsedDataChangeFilter::parse(v, eu_range);
77 (None, res.map(FilterType::DataChangeFilter))
78 },
79 v: EventFilter => {
80 let (res, filter_res) = ParsedEventFilter::new(v, type_tree);
81 (Some(res), filter_res.map(FilterType::EventFilter))
82 },
83 _ => {
84 error!(
85 "Requested data filter type is not supported: {}",
86 filter
87 .body
88 .as_ref()
89 .map(|b| b.type_name())
90 .unwrap_or("Unknown")
91 );
92 (None, Err(StatusCode::BadFilterNotAllowed))
93 }
94 )
95 }
96}
97
98#[derive(Debug)]
99pub struct CreateMonitoredItem {
101 id: u32,
102 subscription_id: u32,
103 item_to_monitor: ParsedReadValueId,
104 monitoring_mode: MonitoringMode,
105 client_handle: u32,
106 discard_oldest: bool,
107 queue_size: usize,
108 sampling_interval: f64,
109 initial_value: Option<DataValue>,
110 status_code: StatusCode,
111 filter: FilterType,
112 filter_res: Option<EventFilterResult>,
113 timestamps_to_return: TimestampsToReturn,
114 eu_range: Option<(f64, f64)>,
115}
116
117fn sanitize_sampling_interval(info: &ServerInfo, requested_sampling_interval: f64) -> f64 {
120 if requested_sampling_interval < 0.0 || !requested_sampling_interval.is_finite() {
121 -1.0
124 } else if requested_sampling_interval == 0.0
125 || requested_sampling_interval < info.config.limits.subscriptions.min_sampling_interval_ms
126 {
127 info.config.limits.subscriptions.min_sampling_interval_ms
128 } else {
129 requested_sampling_interval
130 }
131}
132
133fn sanitize_queue_size(info: &ServerInfo, requested_queue_size: usize) -> usize {
135 if requested_queue_size == 0 || requested_queue_size == 1 {
136 1
139 } else if requested_queue_size
141 > info
142 .config
143 .limits
144 .subscriptions
145 .max_monitored_item_queue_size
146 {
147 info.config
148 .limits
149 .subscriptions
150 .max_monitored_item_queue_size
151 } else {
154 requested_queue_size
155 }
156}
157
158impl CreateMonitoredItem {
159 pub(crate) fn new(
160 req: MonitoredItemCreateRequest,
161 id: u32,
162 sub_id: u32,
163 info: &ServerInfo,
164 timestamps_to_return: TimestampsToReturn,
165 type_tree: &dyn TypeTree,
166 eu_range: Option<(f64, f64)>,
167 ) -> Self {
168 let (filter_res, filter) =
169 FilterType::from_filter(req.requested_parameters.filter, eu_range, type_tree);
170 let sampling_interval =
171 sanitize_sampling_interval(info, req.requested_parameters.sampling_interval);
172 let queue_size = sanitize_queue_size(info, req.requested_parameters.queue_size as usize);
173
174 let (filter, mut status) = match filter {
175 Ok(s) => (s, StatusCode::BadNodeIdUnknown),
176 Err(e) => (FilterType::None, e),
177 };
178
179 let item_to_monitor = match ParsedReadValueId::parse(req.item_to_monitor) {
180 Ok(r) => r,
181 Err(e) => {
182 status = e;
183 ParsedReadValueId::null()
184 }
185 };
186
187 Self {
188 id,
189 subscription_id: sub_id,
190 item_to_monitor,
191 monitoring_mode: req.monitoring_mode,
192 client_handle: req.requested_parameters.client_handle,
193 discard_oldest: req.requested_parameters.discard_oldest,
194 queue_size,
195 sampling_interval,
196 initial_value: None,
197 status_code: status,
198 filter,
199 timestamps_to_return,
200 filter_res,
201 eu_range,
202 }
203 }
204
205 pub fn handle(&self) -> MonitoredItemHandle {
207 MonitoredItemHandle {
208 monitored_item_id: self.id,
209 subscription_id: self.subscription_id,
210 }
211 }
212
213 pub fn set_initial_value(&mut self, value: DataValue) {
215 self.initial_value = Some(value);
216 }
217
218 pub fn set_status(&mut self, status: StatusCode) {
225 self.status_code = status;
226 }
227
228 pub fn item_to_monitor(&self) -> &ParsedReadValueId {
230 &self.item_to_monitor
231 }
232
233 pub fn monitoring_mode(&self) -> MonitoringMode {
235 self.monitoring_mode
236 }
237
238 pub fn sampling_interval(&self) -> f64 {
240 self.sampling_interval
241 }
242
243 pub fn queue_size(&self) -> usize {
245 self.queue_size
246 }
247
248 pub fn filter(&self) -> &FilterType {
250 &self.filter
251 }
252
253 pub fn revise_queue_size(&mut self, queue_size: usize) {
256 if queue_size < self.queue_size && queue_size > 0 || self.queue_size == 0 {
257 self.queue_size = queue_size;
258 }
259 }
260
261 pub fn revise_sampling_interval(&mut self, sampling_interval: f64) {
264 if sampling_interval < self.sampling_interval && sampling_interval > 0.0
265 || self.sampling_interval == 0.0
266 {
267 self.sampling_interval = sampling_interval;
268 }
269 }
270
271 pub fn timestamps_to_return(&self) -> TimestampsToReturn {
273 self.timestamps_to_return
274 }
275
276 pub fn status_code(&self) -> StatusCode {
278 self.status_code
279 }
280
281 pub(crate) fn filter_res(&self) -> Option<&EventFilterResult> {
282 self.filter_res.as_ref()
283 }
284}
285
286#[derive(Debug)]
287pub struct MonitoredItem {
289 id: u32,
290 item_to_monitor: ParsedReadValueId,
291 monitoring_mode: MonitoringMode,
292 triggered_items: BTreeSet<u32>,
295 client_handle: u32,
296 sampling_interval: SamplingInterval,
297 filter: FilterType,
298 discard_oldest: bool,
299 queue_size: usize,
300 notification_queue: VecDeque<Notification>,
301 queue_overflow: bool,
302 timestamps_to_return: TimestampsToReturn,
303 last_data_value: Option<DataValue>,
304 sample_skipped_data_value: Option<DataValue>,
307 any_new_notification: bool,
308 eu_range: Option<(f64, f64)>,
309}
310
311#[derive(Debug)]
312pub(super) enum SamplingInterval {
313 Subscription,
314 Zero,
315 NonZero(TimeDelta),
316}
317
318impl MonitoredItem {
319 pub(super) fn new(request: &CreateMonitoredItem) -> Self {
320 let mut v = Self {
321 id: request.id,
322 item_to_monitor: request.item_to_monitor.clone(),
323 monitoring_mode: request.monitoring_mode,
324 triggered_items: BTreeSet::new(),
325 client_handle: request.client_handle,
326 sampling_interval: parse_sampling_interval(request.sampling_interval),
327 filter: request.filter.clone(),
328 discard_oldest: request.discard_oldest,
329 timestamps_to_return: request.timestamps_to_return,
330 last_data_value: None,
331 sample_skipped_data_value: None,
332 queue_size: request.queue_size,
333 notification_queue: VecDeque::new(),
334 queue_overflow: false,
335 any_new_notification: false,
336 eu_range: request.eu_range,
337 };
338 let now = DateTime::now();
339 if let Some(val) = request.initial_value.as_ref() {
340 v.notify_data_value(val.clone(), &now, true);
341 } else {
342 v.notify_data_value(
343 DataValue {
344 value: Some(Variant::Empty),
345 status: Some(StatusCode::BadWaitingForInitialData),
346 source_timestamp: Some(now),
347 source_picoseconds: None,
348 server_timestamp: Some(now),
349 server_picoseconds: None,
350 },
351 &now,
352 true,
353 );
354 }
355 v
356 }
357
358 pub(super) fn modify(
361 &mut self,
362 info: &ServerInfo,
363 timestamps_to_return: TimestampsToReturn,
364 request: &MonitoredItemModifyRequest,
365 type_tree: &dyn TypeTree,
366 ) -> (Option<EventFilterResult>, StatusCode) {
367 self.timestamps_to_return = timestamps_to_return;
368 let (filter_res, filter) = FilterType::from_filter(
369 request.requested_parameters.filter.clone(),
370 self.eu_range,
371 type_tree,
372 );
373 self.filter = match filter {
374 Ok(f) => f,
375 Err(e) => return (filter_res, e),
376 };
377 let parsed_sampling_interval =
378 sanitize_sampling_interval(info, request.requested_parameters.sampling_interval);
379 self.sampling_interval = parse_sampling_interval(parsed_sampling_interval);
380 self.queue_size =
381 sanitize_queue_size(info, request.requested_parameters.queue_size as usize);
382 self.client_handle = request.requested_parameters.client_handle;
383 self.discard_oldest = request.requested_parameters.discard_oldest;
384
385 if self.notification_queue.len() > self.queue_size {
387 let discard = self.notification_queue.len() - self.queue_size;
389 for _ in 0..discard {
390 if self.discard_oldest {
391 let _ = self.notification_queue.pop_back();
392 } else {
393 let _ = self.notification_queue.pop_front();
394 }
395 }
396 self.notification_queue.shrink_to_fit();
398 }
399 (filter_res, StatusCode::Good)
400 }
401
402 fn filter_by_sampling_interval(
403 &self,
404 old: &DataValue,
405 new: &DataValue,
406 from_subscription_tick: bool,
407 ) -> bool {
408 let (Some(old), Some(new)) = (&old.source_timestamp, &new.source_timestamp) else {
409 return true;
412 };
413
414 let elapsed = new.as_chrono().signed_duration_since(old.as_chrono());
415
416 match self.sampling_interval {
417 SamplingInterval::Subscription => {
418 from_subscription_tick
424 }
425 SamplingInterval::Zero => true,
427 SamplingInterval::NonZero(interval) => {
428 elapsed >= interval
430 }
431 }
432 }
433
434 pub(super) fn maybe_enqueue_skipped_value(&mut self, now: &DateTime) -> bool {
458 match self.sample_skipped_data_value.take() {
459 Some(value) if value.source_timestamp.is_some_and(|v| v <= *now) => {
460 self.notify_data_value(value, now, true);
461 true
462 }
463 Some(value) => {
464 self.sample_skipped_data_value = Some(value);
466 false
467 }
468 None => false,
469 }
470 }
471
472 pub(super) fn notify_data_value(
473 &mut self,
474 mut value: DataValue,
475 now: &DateTime,
476 from_subscription_tick: bool,
477 ) -> bool {
478 if self.monitoring_mode == MonitoringMode::Disabled {
479 return false;
480 }
481
482 let mut extra_enqueued = false;
483 if let Some(skipped_value) = self.sample_skipped_data_value.take() {
484 if skipped_value
487 .source_timestamp
488 .is_some_and(|v| v <= *now && value.source_timestamp.is_none_or(|v2| v2 >= v))
489 {
490 extra_enqueued = self.notify_data_value(skipped_value, now, false);
493 }
494 }
495
496 if !matches!(self.item_to_monitor.index_range, NumericRange::None) {
497 if let Some(v) = value.value {
498 match v.range_of(&self.item_to_monitor.index_range) {
499 Ok(r) => value.value = Some(r),
500 Err(e) => {
501 value.status = Some(e);
502 value.value = Some(Variant::Empty);
503 }
504 }
505 }
506 }
507
508 let (matches_filter, matches_sampling_interval) =
509 match (&self.last_data_value, &self.filter) {
510 (Some(last_dv), FilterType::DataChangeFilter(filter)) => (
511 filter.is_changed(&value, last_dv),
512 self.filter_by_sampling_interval(last_dv, &value, from_subscription_tick),
513 ),
514 (Some(last_dv), FilterType::None) => (
515 value.value != last_dv.value,
516 self.filter_by_sampling_interval(last_dv, &value, from_subscription_tick),
517 ),
518 (None, _) => (true, true),
519 _ => (false, false),
520 };
521
522 if !matches_filter {
523 return extra_enqueued;
524 }
525
526 if !matches_sampling_interval {
529 value.source_timestamp = self
530 .last_data_value
531 .as_ref()
532 .and_then(|dv| dv.source_timestamp)
533 .unwrap_or(*now)
534 .as_chrono()
535 .checked_add_signed(self.sampling_interval_as_time_delta())
536 .map(|v| v.into());
537
538 self.sample_skipped_data_value = Some(value);
539 return true;
542 }
543
544 self.last_data_value = Some(value.clone());
545
546 match self.timestamps_to_return {
547 TimestampsToReturn::Neither | TimestampsToReturn::Invalid => {
548 value.source_timestamp = None;
549 value.source_picoseconds = None;
550 value.server_timestamp = None;
551 value.server_picoseconds = None
552 }
553 TimestampsToReturn::Server => {
554 value.source_timestamp = None;
555 value.source_picoseconds = None;
556 }
557 TimestampsToReturn::Source => {
558 value.server_timestamp = None;
559 value.server_picoseconds = None
560 }
561 TimestampsToReturn::Both => {
562 }
564 }
565
566 let client_handle = self.client_handle;
567 self.enqueue_notification(MonitoredItemNotification {
568 client_handle,
569 value,
570 });
571
572 true
573 }
574
575 pub(super) fn notify_event(&mut self, event: &dyn Event, type_tree: &dyn TypeTree) -> bool {
576 if self.monitoring_mode == MonitoringMode::Disabled {
577 return false;
578 }
579
580 let FilterType::EventFilter(filter) = &self.filter else {
581 return false;
582 };
583
584 let Some(notif) = filter.evaluate(event, self.client_handle, type_tree) else {
585 return false;
586 };
587
588 self.enqueue_notification(notif);
589
590 true
591 }
592
593 fn enqueue_notification(&mut self, notification: impl Into<Notification>) {
594 self.any_new_notification = true;
595 let overflow = self.notification_queue.len() == self.queue_size;
596 if overflow {
597 if self.discard_oldest {
598 self.notification_queue.pop_front();
599 } else {
600 self.notification_queue.pop_back();
601 }
602 }
603
604 let mut notification = notification.into();
605 if overflow {
606 if let Notification::MonitoredItemNotification(n) = &mut notification {
607 n.value.status = Some(n.value.status().set_overflow(true));
608 }
609 self.queue_overflow = true;
610 }
611
612 self.notification_queue.push_back(notification);
613 }
614
615 pub(super) fn add_current_value_to_queue(&mut self) {
616 let last_value = self.notification_queue.front();
618 if let Some(Notification::MonitoredItemNotification(it)) = last_value {
619 if Some(&it.value) == self.last_data_value.as_ref() {
620 return;
621 }
622 }
623
624 let Some(value) = self.last_data_value.as_ref() else {
625 return;
626 };
627
628 self.enqueue_notification(Notification::MonitoredItemNotification(
629 MonitoredItemNotification {
630 client_handle: self.client_handle,
631 value: value.clone(),
632 },
633 ));
634 }
635
636 pub fn has_last_value(&self) -> bool {
638 self.last_data_value.is_some()
639 }
640
641 pub(super) fn has_new_notifications(&mut self) -> bool {
645 let any_new = self.any_new_notification;
646 self.any_new_notification = false;
647 any_new
648 }
649
650 pub(super) fn pop_notification(&mut self) -> Option<Notification> {
651 self.notification_queue.pop_front()
652 }
653
654 pub(super) fn set_triggering(&mut self, items_to_add: &[u32], items_to_remove: &[u32]) {
656 items_to_remove.iter().for_each(|i| {
658 self.triggered_items.remove(i);
659 });
660 items_to_add.iter().for_each(|i| {
661 self.triggered_items.insert(*i);
662 });
663 }
664
665 pub(super) fn remove_dead_trigger(&mut self, id: u32) {
666 self.triggered_items.remove(&id);
667 }
668
669 pub fn is_reporting(&self) -> bool {
671 matches!(self.monitoring_mode, MonitoringMode::Reporting)
672 }
673
674 pub fn is_sampling(&self) -> bool {
676 matches!(
677 self.monitoring_mode,
678 MonitoringMode::Reporting | MonitoringMode::Sampling
679 )
680 }
681
682 pub fn triggered_items(&self) -> &BTreeSet<u32> {
684 &self.triggered_items
685 }
686
687 pub fn has_notifications(&self) -> bool {
689 !self.notification_queue.is_empty()
690 }
691
692 pub fn id(&self) -> u32 {
694 self.id
695 }
696
697 pub fn sampling_interval(&self) -> f64 {
699 match &self.sampling_interval {
700 SamplingInterval::Subscription => -1.0,
701 SamplingInterval::Zero => 0.0,
702 SamplingInterval::NonZero(time_delta) => time_delta
703 .num_microseconds()
704 .map(|v| v as f64 / 1000.0)
705 .unwrap_or(time_delta.num_milliseconds() as f64),
706 }
707 }
708
709 pub fn sampling_interval_as_time_delta(&self) -> TimeDelta {
711 match &self.sampling_interval {
712 SamplingInterval::Subscription => TimeDelta::zero(),
713 SamplingInterval::Zero => TimeDelta::zero(),
714 SamplingInterval::NonZero(time_delta) => *time_delta,
715 }
716 }
717
718 pub fn queue_size(&self) -> usize {
720 self.queue_size
721 }
722
723 pub fn item_to_monitor(&self) -> &ParsedReadValueId {
725 &self.item_to_monitor
726 }
727
728 pub(super) fn set_monitoring_mode(&mut self, monitoring_mode: MonitoringMode) {
729 self.monitoring_mode = monitoring_mode;
730 }
731
732 pub fn monitoring_mode(&self) -> MonitoringMode {
734 self.monitoring_mode
735 }
736
737 pub fn discard_oldest(&self) -> bool {
740 self.discard_oldest
741 }
742
743 pub fn client_handle(&self) -> u32 {
745 self.client_handle
746 }
747}
748
749#[cfg(test)]
750pub(super) mod tests {
751 use chrono::{Duration, TimeDelta, Utc};
752
753 use crate::{
754 node_manager::ParsedReadValueId,
755 subscriptions::monitored_item::{Notification, SamplingInterval},
756 };
757 use opcua_types::{
758 AttributeId, DataChangeFilter, DataChangeTrigger, DataValue, DateTime, Deadband,
759 DeadbandType, MonitoringMode, NodeId, ParsedDataChangeFilter, ReadValueId, StatusCode,
760 Variant,
761 };
762
763 use super::{FilterType, MonitoredItem};
764
765 pub(crate) fn new_monitored_item(
766 id: u32,
767 item_to_monitor: ReadValueId,
768 monitoring_mode: MonitoringMode,
769 filter: FilterType,
770 sampling_interval: SamplingInterval,
771 discard_oldest: bool,
772 initial_value: Option<DataValue>,
773 ) -> MonitoredItem {
774 let mut v = MonitoredItem {
775 id,
776 item_to_monitor: ParsedReadValueId::parse(item_to_monitor).unwrap(),
777 monitoring_mode,
778 triggered_items: Default::default(),
779 client_handle: Default::default(),
780 sampling_interval,
781 filter,
782 discard_oldest,
783 queue_size: 10,
784 notification_queue: Default::default(),
785 queue_overflow: false,
786 timestamps_to_return: opcua_types::TimestampsToReturn::Both,
787 last_data_value: None,
788 sample_skipped_data_value: None,
789 any_new_notification: false,
790 eu_range: None,
791 };
792
793 let now = DateTime::now();
794 if let Some(val) = initial_value {
795 v.notify_data_value(val, &now, true);
796 } else {
797 let now = DateTime::now();
798 v.notify_data_value(
799 DataValue {
800 value: Some(Variant::Empty),
801 status: Some(StatusCode::BadWaitingForInitialData),
802 source_timestamp: Some(now),
803 source_picoseconds: None,
804 server_timestamp: Some(now),
805 server_picoseconds: None,
806 },
807 &now,
808 true,
809 );
810 }
811
812 v
813 }
814
815 #[test]
816 fn data_change_filter() {
817 let filter = DataChangeFilter {
818 trigger: DataChangeTrigger::Status,
819 deadband_type: DeadbandType::None as u32,
820 deadband_value: 0f64,
821 };
822 let mut filter = ParsedDataChangeFilter::parse(filter, None).unwrap();
823
824 let mut v1 = DataValue {
825 value: None,
826 status: None,
827 source_timestamp: None,
828 source_picoseconds: None,
829 server_timestamp: None,
830 server_picoseconds: None,
831 };
832
833 let mut v2 = DataValue {
834 value: None,
835 status: None,
836 source_timestamp: None,
837 source_picoseconds: None,
838 server_timestamp: None,
839 server_picoseconds: None,
840 };
841
842 assert!(!filter.is_changed(&v1, &v2));
843
844 v1.status = Some(StatusCode::Good);
846 assert!(filter.is_changed(&v1, &v2));
847
848 v2.status = Some(StatusCode::Good);
850 assert!(!filter.is_changed(&v1, &v2));
851
852 v1.value = Some(Variant::Boolean(true));
854 assert!(!filter.is_changed(&v1, &v2));
855
856 filter.trigger = DataChangeTrigger::StatusValue;
858 assert!(filter.is_changed(&v1, &v2));
859
860 v2.value = Some(Variant::Boolean(true));
862 assert!(!filter.is_changed(&v1, &v2));
863
864 filter.trigger = DataChangeTrigger::StatusValueTimestamp;
866 assert!(!filter.is_changed(&v1, &v2));
867
868 let now = DateTime::now();
870 v1.source_timestamp = Some(now);
871 assert!(filter.is_changed(&v1, &v2));
872 }
873
874 #[test]
875 fn data_change_deadband_abs() {
876 let filter = DataChangeFilter {
877 trigger: DataChangeTrigger::StatusValue,
878 deadband_type: DeadbandType::Absolute as u32,
880 deadband_value: 1f64,
881 };
882 let filter = ParsedDataChangeFilter::parse(filter, None).unwrap();
883
884 let v1 = DataValue {
885 value: Some(Variant::Double(10f64)),
886 status: None,
887 source_timestamp: None,
888 source_picoseconds: None,
889 server_timestamp: None,
890 server_picoseconds: None,
891 };
892
893 let mut v2 = DataValue {
894 value: Some(Variant::Double(10f64)),
895 status: None,
896 source_timestamp: None,
897 source_picoseconds: None,
898 server_timestamp: None,
899 server_picoseconds: None,
900 };
901
902 assert!(!filter.is_changed(&v1, &v2));
904
905 v2.value = Some(Variant::Double(10.9f64));
907 assert!(!filter.is_changed(&v1, &v2));
908
909 v2.value = Some(Variant::Double(11f64));
911 assert!(!filter.is_changed(&v1, &v2));
912
913 v2.value = Some(Variant::Double(11.00001f64));
915 assert!(filter.is_changed(&v1, &v2));
916 }
917
918 #[test]
919 fn monitored_item_filter() {
920 let start = Utc::now();
921 let mut item = new_monitored_item(
922 1,
923 ReadValueId {
924 node_id: NodeId::null(),
925 attribute_id: AttributeId::Value as u32,
926 ..Default::default()
927 },
928 MonitoringMode::Reporting,
929 FilterType::DataChangeFilter(ParsedDataChangeFilter {
930 trigger: DataChangeTrigger::StatusValue,
931 deadband: Deadband::Absolute(0.9),
933 }),
934 SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
935 true,
936 Some(DataValue::new_at(1.0, start.into())),
937 );
938
939 assert!(item.notify_data_value(
941 DataValue::new_at(
942 2.0,
943 (start + Duration::try_milliseconds(50).unwrap()).into()
944 ),
945 &start.into(),
946 false
947 ));
948 assert_eq!(1, item.notification_queue.len());
949 assert!(item.sample_skipped_data_value.is_some());
950 assert!(!item.notify_data_value(
952 DataValue::new_at(
953 1.5,
954 (start + Duration::try_milliseconds(100).unwrap()).into()
955 ),
956 &start.into(),
957 false
958 ));
959 item.set_monitoring_mode(MonitoringMode::Disabled);
961 assert!(!item.notify_data_value(
962 DataValue::new_at(
963 3.0,
964 (start + Duration::try_milliseconds(250).unwrap()).into()
965 ),
966 &start.into(),
967 false
968 ));
969 item.set_monitoring_mode(MonitoringMode::Reporting);
970 assert!(item.notify_data_value(
972 DataValue::new_at(
973 2.0,
974 (start + Duration::try_milliseconds(100).unwrap()).into()
975 ),
976 &start.into(),
977 false
978 ));
979 assert!(!item.notify_data_value(
981 DataValue::new_at(
982 2.5,
983 (start + Duration::try_milliseconds(200).unwrap()).into()
984 ),
985 &start.into(),
986 false
987 ));
988 assert!(item.notify_data_value(
990 DataValue::new_at(
991 3.0,
992 (start + Duration::try_milliseconds(250).unwrap()).into()
993 ),
994 &start.into(),
995 false
996 ));
997 assert_eq!(item.notification_queue.len(), 3);
998 }
999
1000 #[test]
1001 fn monitored_item_overflow() {
1002 let start = Utc::now();
1003 let mut item = new_monitored_item(
1004 1,
1005 ReadValueId {
1006 node_id: NodeId::null(),
1007 attribute_id: AttributeId::Value as u32,
1008 ..Default::default()
1009 },
1010 MonitoringMode::Reporting,
1011 FilterType::None,
1012 SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
1013 true,
1014 Some(DataValue::new_at(0, start.into())),
1015 );
1016 let now = start.into();
1017 item.queue_size = 5;
1018 for i in 0..4 {
1019 assert!(item.notify_data_value(
1020 DataValue::new_at(
1021 i as i32 + 1,
1022 (start + Duration::try_milliseconds(100 * i + 100).unwrap()).into(),
1023 ),
1024 &now,
1025 false
1026 ));
1027 }
1028 assert_eq!(item.notification_queue.len(), 5);
1029
1030 assert!(item.notify_data_value(
1031 DataValue::new_at(5, (start + Duration::try_milliseconds(600).unwrap()).into(),),
1032 &now,
1033 false
1034 ));
1035
1036 assert_eq!(item.notification_queue.len(), 5);
1037 let items: Vec<_> = item.notification_queue.drain(..).collect();
1038 for (idx, notif) in items.iter().enumerate() {
1039 let Notification::MonitoredItemNotification(n) = notif else {
1040 panic!("Wrong notification type");
1041 };
1042 let Some(Variant::Int32(v)) = &n.value.value else {
1043 panic!("Wrong value type");
1044 };
1045 assert_eq!(*v, idx as i32 + 1);
1047 if idx == 4 {
1049 assert_eq!(n.value.status, Some(StatusCode::Good.set_overflow(true)));
1050 } else {
1051 assert_eq!(n.value.status, Some(StatusCode::Good));
1052 }
1053 }
1054 }
1055
1056 #[test]
1057 fn monitored_item_delayed_sample() {
1058 let start = Utc::now();
1059 let mut item = new_monitored_item(
1060 1,
1061 ReadValueId {
1062 node_id: NodeId::null(),
1063 attribute_id: AttributeId::Value as u32,
1064 ..Default::default()
1065 },
1066 MonitoringMode::Reporting,
1067 FilterType::None,
1068 SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
1069 true,
1070 Some(DataValue::new_at(0, start.into())),
1071 );
1072 let t = start + TimeDelta::milliseconds(50);
1073 assert!(item.notify_data_value(DataValue::new_at(1, t.into()), &t.into(), false));
1075 assert_eq!(item.notification_queue.len(), 1);
1076 assert!(item.sample_skipped_data_value.is_some());
1077
1078 let t = start + TimeDelta::milliseconds(100);
1082 assert!(item.notify_data_value(DataValue::new_at(2, t.into()), &start.into(), false));
1083 assert!(item.sample_skipped_data_value.is_none());
1084 assert_eq!(item.notification_queue.len(), 2);
1085 item.notification_queue.drain(..);
1086
1087 let t = start + TimeDelta::milliseconds(150);
1089 assert!(item.notify_data_value(DataValue::new_at(3, t.into()), &t.into(), false));
1090 assert_eq!(item.notification_queue.len(), 0);
1091 assert!(item.sample_skipped_data_value.is_some());
1092
1093 let t = start + TimeDelta::milliseconds(300);
1095 assert!(item.notify_data_value(DataValue::new_at(4, t.into()), &t.into(), false));
1096 assert!(item.sample_skipped_data_value.is_none());
1097 assert_eq!(item.notification_queue.len(), 2);
1098
1099 item.notification_queue.drain(..);
1100 let t = start + TimeDelta::milliseconds(350);
1102 assert!(item.notify_data_value(DataValue::new_at(5, t.into()), &t.into(), false));
1103 assert!(item.sample_skipped_data_value.is_some());
1104 assert_eq!(item.notification_queue.len(), 0);
1105
1106 let t = start + TimeDelta::milliseconds(400);
1107 assert!(item.maybe_enqueue_skipped_value(&t.into()));
1109 assert_eq!(item.notification_queue.len(), 1);
1110 assert!(item.sample_skipped_data_value.is_none());
1111 }
1112}