opcua_server/subscriptions/
monitored_item.rs

1use std::collections::{BTreeSet, VecDeque};
2
3use chrono::TimeDelta;
4use opcua_nodes::{Event, ParsedEventFilter, TypeTree};
5use tracing::{error, warn};
6
7use super::MonitoredItemHandle;
8use crate::{info::ServerInfo, node_manager::ParsedReadValueId};
9use opcua_types::{
10    match_extension_object_owned, DataChangeFilter, DataValue, DateTime, EventFieldList,
11    EventFilter, EventFilterResult, ExtensionObject, MonitoredItemCreateRequest,
12    MonitoredItemModifyRequest, MonitoredItemNotification, MonitoringMode, NumericRange,
13    ParsedDataChangeFilter, StatusCode, TimestampsToReturn, Variant,
14};
15
16#[derive(Debug, Clone, PartialEq)]
17pub(crate) enum Notification {
18    MonitoredItemNotification(MonitoredItemNotification),
19    Event(EventFieldList),
20}
21
22impl From<MonitoredItemNotification> for Notification {
23    fn from(v: MonitoredItemNotification) -> Self {
24        Notification::MonitoredItemNotification(v)
25    }
26}
27
28impl From<EventFieldList> for Notification {
29    fn from(v: EventFieldList) -> Self {
30        Notification::Event(v)
31    }
32}
33
34fn parse_sampling_interval(int: f64) -> SamplingInterval {
35    match int {
36        ..0.0 => SamplingInterval::Subscription,
37        0.0 => SamplingInterval::Zero,
38        _ => {
39            if !int.is_finite() {
40                warn!("Invalid sampling interval {}, using maximum", int);
41                return SamplingInterval::NonZero(TimeDelta::MAX);
42            }
43            let raw = int * 1000.0;
44            SamplingInterval::NonZero(if raw > i64::MAX as f64 {
45                TimeDelta::MAX
46            } else {
47                TimeDelta::microseconds(raw as i64)
48            })
49        }
50    }
51}
52
53#[derive(Debug, Clone)]
54/// Parsed filter type for a monitored item.
55pub enum FilterType {
56    None,
57    DataChangeFilter(ParsedDataChangeFilter),
58    EventFilter(ParsedEventFilter),
59}
60
61impl FilterType {
62    /// Try to create a filter from an extension object, returning
63    /// an `EventFilterResult` if the filter is for events.
64    pub fn from_filter(
65        filter: ExtensionObject,
66        eu_range: Option<(f64, f64)>,
67        type_tree: &dyn TypeTree,
68    ) -> (Option<EventFilterResult>, Result<FilterType, StatusCode>) {
69        // Check if the filter is a supported filter type
70        if filter.is_null() {
71            return (None, Ok(FilterType::None));
72        }
73
74        match_extension_object_owned!(filter,
75            v: DataChangeFilter => {
76                let res = ParsedDataChangeFilter::parse(v, eu_range);
77                (None, res.map(FilterType::DataChangeFilter))
78            },
79            v: EventFilter => {
80                let (res, filter_res) = ParsedEventFilter::new(v, type_tree);
81                (Some(res), filter_res.map(FilterType::EventFilter))
82            },
83            _ => {
84                error!(
85                    "Requested data filter type is not supported: {}",
86                    filter
87                        .body
88                        .as_ref()
89                        .map(|b| b.type_name())
90                        .unwrap_or("Unknown")
91                );
92                (None, Err(StatusCode::BadFilterNotAllowed))
93            }
94        )
95    }
96}
97
98#[derive(Debug)]
99/// Container for a request to create a single monitored item.
100pub struct CreateMonitoredItem {
101    id: u32,
102    subscription_id: u32,
103    item_to_monitor: ParsedReadValueId,
104    monitoring_mode: MonitoringMode,
105    client_handle: u32,
106    discard_oldest: bool,
107    queue_size: usize,
108    sampling_interval: f64,
109    initial_value: Option<DataValue>,
110    status_code: StatusCode,
111    filter: FilterType,
112    filter_res: Option<EventFilterResult>,
113    timestamps_to_return: TimestampsToReturn,
114    eu_range: Option<(f64, f64)>,
115}
116
117/// Takes the requested sampling interval value supplied by client and ensures it is within
118/// the range supported by the server
119fn sanitize_sampling_interval(info: &ServerInfo, requested_sampling_interval: f64) -> f64 {
120    if requested_sampling_interval < 0.0 || !requested_sampling_interval.is_finite() {
121        // From spec "any negative number is interpreted as -1"
122        // -1 means monitored item's sampling interval defaults to the subscription's publishing interval
123        -1.0
124    } else if requested_sampling_interval == 0.0
125        || requested_sampling_interval < info.config.limits.subscriptions.min_sampling_interval_ms
126    {
127        info.config.limits.subscriptions.min_sampling_interval_ms
128    } else {
129        requested_sampling_interval
130    }
131}
132
133/// Takes the requested queue size and ensures it is within the range supported by the server
134fn sanitize_queue_size(info: &ServerInfo, requested_queue_size: usize) -> usize {
135    if requested_queue_size == 0 || requested_queue_size == 1 {
136        // For data monitored items 0 -> 1
137        // Future - for event monitored items, queue size should be the default queue size for event notifications
138        1
139    // Future - for event monitored items, the minimum queue size the server requires for event notifications
140    } else if requested_queue_size
141        > info
142            .config
143            .limits
144            .subscriptions
145            .max_monitored_item_queue_size
146    {
147        info.config
148            .limits
149            .subscriptions
150            .max_monitored_item_queue_size
151    // Future - for event monitored items MaxUInt32 returns the maximum queue size the server support
152    // for event notifications
153    } else {
154        requested_queue_size
155    }
156}
157
158impl CreateMonitoredItem {
159    pub(crate) fn new(
160        req: MonitoredItemCreateRequest,
161        id: u32,
162        sub_id: u32,
163        info: &ServerInfo,
164        timestamps_to_return: TimestampsToReturn,
165        type_tree: &dyn TypeTree,
166        eu_range: Option<(f64, f64)>,
167    ) -> Self {
168        let (filter_res, filter) =
169            FilterType::from_filter(req.requested_parameters.filter, eu_range, type_tree);
170        let sampling_interval =
171            sanitize_sampling_interval(info, req.requested_parameters.sampling_interval);
172        let queue_size = sanitize_queue_size(info, req.requested_parameters.queue_size as usize);
173
174        let (filter, mut status) = match filter {
175            Ok(s) => (s, StatusCode::BadNodeIdUnknown),
176            Err(e) => (FilterType::None, e),
177        };
178
179        let item_to_monitor = match ParsedReadValueId::parse(req.item_to_monitor) {
180            Ok(r) => r,
181            Err(e) => {
182                status = e;
183                ParsedReadValueId::null()
184            }
185        };
186
187        Self {
188            id,
189            subscription_id: sub_id,
190            item_to_monitor,
191            monitoring_mode: req.monitoring_mode,
192            client_handle: req.requested_parameters.client_handle,
193            discard_oldest: req.requested_parameters.discard_oldest,
194            queue_size,
195            sampling_interval,
196            initial_value: None,
197            status_code: status,
198            filter,
199            timestamps_to_return,
200            filter_res,
201            eu_range,
202        }
203    }
204
205    /// Get the monitored item handle of this create request.
206    pub fn handle(&self) -> MonitoredItemHandle {
207        MonitoredItemHandle {
208            monitored_item_id: self.id,
209            subscription_id: self.subscription_id,
210        }
211    }
212
213    /// Set the initial value of the monitored item.
214    pub fn set_initial_value(&mut self, value: DataValue) {
215        self.initial_value = Some(value);
216    }
217
218    /// Set the status of the monitored item create request.
219    /// If this is an error after all node managers have been evulated, the
220    /// monitored item will not be created on the server.
221    ///
222    /// Note: Only consider a monitored item to be created if this is set to a
223    /// `Good` status code.
224    pub fn set_status(&mut self, status: StatusCode) {
225        self.status_code = status;
226    }
227
228    /// Attribute to monitor.
229    pub fn item_to_monitor(&self) -> &ParsedReadValueId {
230        &self.item_to_monitor
231    }
232
233    /// Requested monitoring mode.
234    pub fn monitoring_mode(&self) -> MonitoringMode {
235        self.monitoring_mode
236    }
237
238    /// Requested sampling interval in milliseconds.
239    pub fn sampling_interval(&self) -> f64 {
240        self.sampling_interval
241    }
242
243    /// Requested queue size.
244    pub fn queue_size(&self) -> usize {
245        self.queue_size
246    }
247
248    /// Requested filter type.
249    pub fn filter(&self) -> &FilterType {
250        &self.filter
251    }
252
253    /// Revise the queue size, setting it equal to the given `queue_size` if it is smaller
254    /// or if the requested queue size is 0.
255    pub fn revise_queue_size(&mut self, queue_size: usize) {
256        if queue_size < self.queue_size && queue_size > 0 || self.queue_size == 0 {
257            self.queue_size = queue_size;
258        }
259    }
260
261    /// Revise the sampling interval, settign it equal to the given `sampling_interval` if
262    /// it is larger.
263    pub fn revise_sampling_interval(&mut self, sampling_interval: f64) {
264        if sampling_interval < self.sampling_interval && sampling_interval > 0.0
265            || self.sampling_interval == 0.0
266        {
267            self.sampling_interval = sampling_interval;
268        }
269    }
270
271    /// Requested timestamps to return.
272    pub fn timestamps_to_return(&self) -> TimestampsToReturn {
273        self.timestamps_to_return
274    }
275
276    /// Get the current result status code.
277    pub fn status_code(&self) -> StatusCode {
278        self.status_code
279    }
280
281    pub(crate) fn filter_res(&self) -> Option<&EventFilterResult> {
282        self.filter_res.as_ref()
283    }
284}
285
286#[derive(Debug)]
287/// State of an active monitored item on the server.
288pub struct MonitoredItem {
289    id: u32,
290    item_to_monitor: ParsedReadValueId,
291    monitoring_mode: MonitoringMode,
292    // Triggered items are other monitored items in the same subscription which are reported if this
293    // monitored item changes.
294    triggered_items: BTreeSet<u32>,
295    client_handle: u32,
296    sampling_interval: SamplingInterval,
297    filter: FilterType,
298    discard_oldest: bool,
299    queue_size: usize,
300    notification_queue: VecDeque<Notification>,
301    queue_overflow: bool,
302    timestamps_to_return: TimestampsToReturn,
303    last_data_value: Option<DataValue>,
304    /// Value skipped due to sampling interval, we keep these
305    /// so that we can generate a new notification later.
306    sample_skipped_data_value: Option<DataValue>,
307    any_new_notification: bool,
308    eu_range: Option<(f64, f64)>,
309}
310
311#[derive(Debug)]
312pub(super) enum SamplingInterval {
313    Subscription,
314    Zero,
315    NonZero(TimeDelta),
316}
317
318impl MonitoredItem {
319    pub(super) fn new(request: &CreateMonitoredItem) -> Self {
320        let mut v = Self {
321            id: request.id,
322            item_to_monitor: request.item_to_monitor.clone(),
323            monitoring_mode: request.monitoring_mode,
324            triggered_items: BTreeSet::new(),
325            client_handle: request.client_handle,
326            sampling_interval: parse_sampling_interval(request.sampling_interval),
327            filter: request.filter.clone(),
328            discard_oldest: request.discard_oldest,
329            timestamps_to_return: request.timestamps_to_return,
330            last_data_value: None,
331            sample_skipped_data_value: None,
332            queue_size: request.queue_size,
333            notification_queue: VecDeque::new(),
334            queue_overflow: false,
335            any_new_notification: false,
336            eu_range: request.eu_range,
337        };
338        let now = DateTime::now();
339        if let Some(val) = request.initial_value.as_ref() {
340            v.notify_data_value(val.clone(), &now, true);
341        } else {
342            v.notify_data_value(
343                DataValue {
344                    value: Some(Variant::Empty),
345                    status: Some(StatusCode::BadWaitingForInitialData),
346                    source_timestamp: Some(now),
347                    source_picoseconds: None,
348                    server_timestamp: Some(now),
349                    server_picoseconds: None,
350                },
351                &now,
352                true,
353            );
354        }
355        v
356    }
357
358    /// Modifies the existing item with the values of the modify request. On success, the result
359    /// holds the filter result.
360    pub(super) fn modify(
361        &mut self,
362        info: &ServerInfo,
363        timestamps_to_return: TimestampsToReturn,
364        request: &MonitoredItemModifyRequest,
365        type_tree: &dyn TypeTree,
366    ) -> (Option<EventFilterResult>, StatusCode) {
367        self.timestamps_to_return = timestamps_to_return;
368        let (filter_res, filter) = FilterType::from_filter(
369            request.requested_parameters.filter.clone(),
370            self.eu_range,
371            type_tree,
372        );
373        self.filter = match filter {
374            Ok(f) => f,
375            Err(e) => return (filter_res, e),
376        };
377        let parsed_sampling_interval =
378            sanitize_sampling_interval(info, request.requested_parameters.sampling_interval);
379        self.sampling_interval = parse_sampling_interval(parsed_sampling_interval);
380        self.queue_size =
381            sanitize_queue_size(info, request.requested_parameters.queue_size as usize);
382        self.client_handle = request.requested_parameters.client_handle;
383        self.discard_oldest = request.requested_parameters.discard_oldest;
384
385        // Shrink / grow the notification queue to the new threshold
386        if self.notification_queue.len() > self.queue_size {
387            // Discard old notifications
388            let discard = self.notification_queue.len() - self.queue_size;
389            for _ in 0..discard {
390                if self.discard_oldest {
391                    let _ = self.notification_queue.pop_back();
392                } else {
393                    let _ = self.notification_queue.pop_front();
394                }
395            }
396            // Shrink the queue
397            self.notification_queue.shrink_to_fit();
398        }
399        (filter_res, StatusCode::Good)
400    }
401
402    fn filter_by_sampling_interval(
403        &self,
404        old: &DataValue,
405        new: &DataValue,
406        from_subscription_tick: bool,
407    ) -> bool {
408        let (Some(old), Some(new)) = (&old.source_timestamp, &new.source_timestamp) else {
409            // Always include measurements without source timestamp, we don't know enough about these,
410            // assume the server implementation did filtering elsewhere.
411            return true;
412        };
413
414        let elapsed = new.as_chrono().signed_duration_since(old.as_chrono());
415
416        match self.sampling_interval {
417            SamplingInterval::Subscription => {
418                // If the sampling interval is set to subscription, we use the subscription's
419                // sampling interval, so we should never allow notifications through unless
420                // it's due to a subscription tick.
421                // This means we always store the sampled value in `sample_skipped_data_value`,
422                // then we can use it later if we get a subscription tick.
423                from_subscription_tick
424            }
425            // If the sampling interval is set to zero, we always allow notifications through.
426            SamplingInterval::Zero => true,
427            SamplingInterval::NonZero(interval) => {
428                // If the sampling interval is set to non-zero, we use that.
429                elapsed >= interval
430            }
431        }
432    }
433
434    /// Enqueue a value skipped due to sampling interval,
435    /// if its new timestamp is in the past.
436    ///
437    /// Effectively what this mechanism does is to delay a notification
438    /// if it arrives too early. I.e. with a sampling interval of 100ms, we get
439    /// a notification at 0ms, and at 50ms. The second one is skipped, but if we don't
440    /// get any new notifications until after 100ms, we will send the 50ms notification with
441    /// timestamp equal to 100ms.
442    ///
443    /// This specifically avoids situations where two value changes arrive quickly,
444    /// and then we get no new value changes for a long time. In this case,
445    /// for the client it would appear as if the value changed at 0ms, and then
446    /// was held constant for a long time.
447    ///
448    /// Instead, we want it to appear as if we actually sampled the value at 0ms and
449    /// at 100ms, even if we actually don't sample at all.
450    ///
451    /// A corner case occurs if a new value arrives at exactly 100ms, in which case
452    /// we discard the previous value after all. If a new value arrives at 101ms, it
453    /// would be delayed to 200ms, giving the appearance of regular samples at 100ms intervals,
454    /// even at the cost of delaying the actual update by a little bit.
455    ///
456    /// Users that want to avoid this should just set the sampling interval to 0.
457    pub(super) fn maybe_enqueue_skipped_value(&mut self, now: &DateTime) -> bool {
458        match self.sample_skipped_data_value.take() {
459            Some(value) if value.source_timestamp.is_some_and(|v| v <= *now) => {
460                self.notify_data_value(value, now, true);
461                true
462            }
463            Some(value) => {
464                // If there is no new sample, we can keep the last skipped value.
465                self.sample_skipped_data_value = Some(value);
466                false
467            }
468            None => false,
469        }
470    }
471
472    pub(super) fn notify_data_value(
473        &mut self,
474        mut value: DataValue,
475        now: &DateTime,
476        from_subscription_tick: bool,
477    ) -> bool {
478        if self.monitoring_mode == MonitoringMode::Disabled {
479            return false;
480        }
481
482        let mut extra_enqueued = false;
483        if let Some(skipped_value) = self.sample_skipped_data_value.take() {
484            // We may use the skipped value if it is in the past,
485            // and if it is earlier than the value we are currently reporting.
486            if skipped_value
487                .source_timestamp
488                .is_some_and(|v| v <= *now && value.source_timestamp.is_none_or(|v2| v2 >= v))
489            {
490                // We still pass it through regular filtering, so it's not guaranteed to be enqueued,
491                // for example if the sampling interval is `Subscription`.
492                extra_enqueued = self.notify_data_value(skipped_value, now, false);
493            }
494        }
495
496        if !matches!(self.item_to_monitor.index_range, NumericRange::None) {
497            if let Some(v) = value.value {
498                match v.range_of(&self.item_to_monitor.index_range) {
499                    Ok(r) => value.value = Some(r),
500                    Err(e) => {
501                        value.status = Some(e);
502                        value.value = Some(Variant::Empty);
503                    }
504                }
505            }
506        }
507
508        let (matches_filter, matches_sampling_interval) =
509            match (&self.last_data_value, &self.filter) {
510                (Some(last_dv), FilterType::DataChangeFilter(filter)) => (
511                    filter.is_changed(&value, last_dv),
512                    self.filter_by_sampling_interval(last_dv, &value, from_subscription_tick),
513                ),
514                (Some(last_dv), FilterType::None) => (
515                    value.value != last_dv.value,
516                    self.filter_by_sampling_interval(last_dv, &value, from_subscription_tick),
517                ),
518                (None, _) => (true, true),
519                _ => (false, false),
520            };
521
522        if !matches_filter {
523            return extra_enqueued;
524        }
525
526        // If we're outside the sampling interval, keep the value for now,
527        // but shift it to the next sample.
528        if !matches_sampling_interval {
529            value.source_timestamp = self
530                .last_data_value
531                .as_ref()
532                .and_then(|dv| dv.source_timestamp)
533                .unwrap_or(*now)
534                .as_chrono()
535                .checked_add_signed(self.sampling_interval_as_time_delta())
536                .map(|v| v.into());
537
538            self.sample_skipped_data_value = Some(value);
539            // We need to return true here, so that the subscription knows it needs to tick this
540            // monitored item later.
541            return true;
542        }
543
544        self.last_data_value = Some(value.clone());
545
546        match self.timestamps_to_return {
547            TimestampsToReturn::Neither | TimestampsToReturn::Invalid => {
548                value.source_timestamp = None;
549                value.source_picoseconds = None;
550                value.server_timestamp = None;
551                value.server_picoseconds = None
552            }
553            TimestampsToReturn::Server => {
554                value.source_timestamp = None;
555                value.source_picoseconds = None;
556            }
557            TimestampsToReturn::Source => {
558                value.server_timestamp = None;
559                value.server_picoseconds = None
560            }
561            TimestampsToReturn::Both => {
562                // DO NOTHING
563            }
564        }
565
566        let client_handle = self.client_handle;
567        self.enqueue_notification(MonitoredItemNotification {
568            client_handle,
569            value,
570        });
571
572        true
573    }
574
575    pub(super) fn notify_event(&mut self, event: &dyn Event, type_tree: &dyn TypeTree) -> bool {
576        if self.monitoring_mode == MonitoringMode::Disabled {
577            return false;
578        }
579
580        let FilterType::EventFilter(filter) = &self.filter else {
581            return false;
582        };
583
584        let Some(notif) = filter.evaluate(event, self.client_handle, type_tree) else {
585            return false;
586        };
587
588        self.enqueue_notification(notif);
589
590        true
591    }
592
593    fn enqueue_notification(&mut self, notification: impl Into<Notification>) {
594        self.any_new_notification = true;
595        let overflow = self.notification_queue.len() == self.queue_size;
596        if overflow {
597            if self.discard_oldest {
598                self.notification_queue.pop_front();
599            } else {
600                self.notification_queue.pop_back();
601            }
602        }
603
604        let mut notification = notification.into();
605        if overflow {
606            if let Notification::MonitoredItemNotification(n) = &mut notification {
607                n.value.status = Some(n.value.status().set_overflow(true));
608            }
609            self.queue_overflow = true;
610        }
611
612        self.notification_queue.push_back(notification);
613    }
614
615    pub(super) fn add_current_value_to_queue(&mut self) {
616        // Check if the last value is already enqueued
617        let last_value = self.notification_queue.front();
618        if let Some(Notification::MonitoredItemNotification(it)) = last_value {
619            if Some(&it.value) == self.last_data_value.as_ref() {
620                return;
621            }
622        }
623
624        let Some(value) = self.last_data_value.as_ref() else {
625            return;
626        };
627
628        self.enqueue_notification(Notification::MonitoredItemNotification(
629            MonitoredItemNotification {
630                client_handle: self.client_handle,
631                value: value.clone(),
632            },
633        ));
634    }
635
636    /// Return `true` if this item has a stored last value.
637    pub fn has_last_value(&self) -> bool {
638        self.last_data_value.is_some()
639    }
640
641    /// Return `true` if this item has any new notifications.
642    /// Note that this clears the `any_new_notification` flag and should
643    /// be used with care.
644    pub(super) fn has_new_notifications(&mut self) -> bool {
645        let any_new = self.any_new_notification;
646        self.any_new_notification = false;
647        any_new
648    }
649
650    pub(super) fn pop_notification(&mut self) -> Option<Notification> {
651        self.notification_queue.pop_front()
652    }
653
654    /// Adds or removes other monitored items which will be triggered when this monitored item changes
655    pub(super) fn set_triggering(&mut self, items_to_add: &[u32], items_to_remove: &[u32]) {
656        // Spec says to process remove items before adding new ones.
657        items_to_remove.iter().for_each(|i| {
658            self.triggered_items.remove(i);
659        });
660        items_to_add.iter().for_each(|i| {
661            self.triggered_items.insert(*i);
662        });
663    }
664
665    pub(super) fn remove_dead_trigger(&mut self, id: u32) {
666        self.triggered_items.remove(&id);
667    }
668
669    /// Whether this monitored item is currently reporting new values.
670    pub fn is_reporting(&self) -> bool {
671        matches!(self.monitoring_mode, MonitoringMode::Reporting)
672    }
673
674    /// Whether this monitored item is currently storing new values.
675    pub fn is_sampling(&self) -> bool {
676        matches!(
677            self.monitoring_mode,
678            MonitoringMode::Reporting | MonitoringMode::Sampling
679        )
680    }
681
682    /// Items that are triggered by updates to this monitored item.
683    pub fn triggered_items(&self) -> &BTreeSet<u32> {
684        &self.triggered_items
685    }
686
687    /// Whether this monitored item has enqueued notifications.
688    pub fn has_notifications(&self) -> bool {
689        !self.notification_queue.is_empty()
690    }
691
692    /// Monitored item ID.
693    pub fn id(&self) -> u32 {
694        self.id
695    }
696
697    /// Sampling interval.
698    pub fn sampling_interval(&self) -> f64 {
699        match &self.sampling_interval {
700            SamplingInterval::Subscription => -1.0,
701            SamplingInterval::Zero => 0.0,
702            SamplingInterval::NonZero(time_delta) => time_delta
703                .num_microseconds()
704                .map(|v| v as f64 / 1000.0)
705                .unwrap_or(time_delta.num_milliseconds() as f64),
706        }
707    }
708
709    /// Get the sampling interval as a `TimeDelta`.
710    pub fn sampling_interval_as_time_delta(&self) -> TimeDelta {
711        match &self.sampling_interval {
712            SamplingInterval::Subscription => TimeDelta::zero(),
713            SamplingInterval::Zero => TimeDelta::zero(),
714            SamplingInterval::NonZero(time_delta) => *time_delta,
715        }
716    }
717
718    /// Current maximum queue size.
719    pub fn queue_size(&self) -> usize {
720        self.queue_size
721    }
722
723    /// Item being monitored.
724    pub fn item_to_monitor(&self) -> &ParsedReadValueId {
725        &self.item_to_monitor
726    }
727
728    pub(super) fn set_monitoring_mode(&mut self, monitoring_mode: MonitoringMode) {
729        self.monitoring_mode = monitoring_mode;
730    }
731
732    /// Current monitoring mode.
733    pub fn monitoring_mode(&self) -> MonitoringMode {
734        self.monitoring_mode
735    }
736
737    /// Whether oldest or newest values are discarded when the queue
738    /// overflows.
739    pub fn discard_oldest(&self) -> bool {
740        self.discard_oldest
741    }
742
743    /// Get the client defined handle for this monitored item.
744    pub fn client_handle(&self) -> u32 {
745        self.client_handle
746    }
747}
748
749#[cfg(test)]
750pub(super) mod tests {
751    use chrono::{Duration, TimeDelta, Utc};
752
753    use crate::{
754        node_manager::ParsedReadValueId,
755        subscriptions::monitored_item::{Notification, SamplingInterval},
756    };
757    use opcua_types::{
758        AttributeId, DataChangeFilter, DataChangeTrigger, DataValue, DateTime, Deadband,
759        DeadbandType, MonitoringMode, NodeId, ParsedDataChangeFilter, ReadValueId, StatusCode,
760        Variant,
761    };
762
763    use super::{FilterType, MonitoredItem};
764
765    pub(crate) fn new_monitored_item(
766        id: u32,
767        item_to_monitor: ReadValueId,
768        monitoring_mode: MonitoringMode,
769        filter: FilterType,
770        sampling_interval: SamplingInterval,
771        discard_oldest: bool,
772        initial_value: Option<DataValue>,
773    ) -> MonitoredItem {
774        let mut v = MonitoredItem {
775            id,
776            item_to_monitor: ParsedReadValueId::parse(item_to_monitor).unwrap(),
777            monitoring_mode,
778            triggered_items: Default::default(),
779            client_handle: Default::default(),
780            sampling_interval,
781            filter,
782            discard_oldest,
783            queue_size: 10,
784            notification_queue: Default::default(),
785            queue_overflow: false,
786            timestamps_to_return: opcua_types::TimestampsToReturn::Both,
787            last_data_value: None,
788            sample_skipped_data_value: None,
789            any_new_notification: false,
790            eu_range: None,
791        };
792
793        let now = DateTime::now();
794        if let Some(val) = initial_value {
795            v.notify_data_value(val, &now, true);
796        } else {
797            let now = DateTime::now();
798            v.notify_data_value(
799                DataValue {
800                    value: Some(Variant::Empty),
801                    status: Some(StatusCode::BadWaitingForInitialData),
802                    source_timestamp: Some(now),
803                    source_picoseconds: None,
804                    server_timestamp: Some(now),
805                    server_picoseconds: None,
806                },
807                &now,
808                true,
809            );
810        }
811
812        v
813    }
814
815    #[test]
816    fn data_change_filter() {
817        let filter = DataChangeFilter {
818            trigger: DataChangeTrigger::Status,
819            deadband_type: DeadbandType::None as u32,
820            deadband_value: 0f64,
821        };
822        let mut filter = ParsedDataChangeFilter::parse(filter, None).unwrap();
823
824        let mut v1 = DataValue {
825            value: None,
826            status: None,
827            source_timestamp: None,
828            source_picoseconds: None,
829            server_timestamp: None,
830            server_picoseconds: None,
831        };
832
833        let mut v2 = DataValue {
834            value: None,
835            status: None,
836            source_timestamp: None,
837            source_picoseconds: None,
838            server_timestamp: None,
839            server_picoseconds: None,
840        };
841
842        assert!(!filter.is_changed(&v1, &v2));
843
844        // Change v1 status
845        v1.status = Some(StatusCode::Good);
846        assert!(filter.is_changed(&v1, &v2));
847
848        // Change v2 status
849        v2.status = Some(StatusCode::Good);
850        assert!(!filter.is_changed(&v1, &v2));
851
852        // Change value - but since trigger is status, this should not matter
853        v1.value = Some(Variant::Boolean(true));
854        assert!(!filter.is_changed(&v1, &v2));
855
856        // Change trigger to status-value and change should matter
857        filter.trigger = DataChangeTrigger::StatusValue;
858        assert!(filter.is_changed(&v1, &v2));
859
860        // Now values are the same
861        v2.value = Some(Variant::Boolean(true));
862        assert!(!filter.is_changed(&v1, &v2));
863
864        // And for status-value-timestamp
865        filter.trigger = DataChangeTrigger::StatusValueTimestamp;
866        assert!(!filter.is_changed(&v1, &v2));
867
868        // Change timestamps to differ
869        let now = DateTime::now();
870        v1.source_timestamp = Some(now);
871        assert!(filter.is_changed(&v1, &v2));
872    }
873
874    #[test]
875    fn data_change_deadband_abs() {
876        let filter = DataChangeFilter {
877            trigger: DataChangeTrigger::StatusValue,
878            // Abs compare
879            deadband_type: DeadbandType::Absolute as u32,
880            deadband_value: 1f64,
881        };
882        let filter = ParsedDataChangeFilter::parse(filter, None).unwrap();
883
884        let v1 = DataValue {
885            value: Some(Variant::Double(10f64)),
886            status: None,
887            source_timestamp: None,
888            source_picoseconds: None,
889            server_timestamp: None,
890            server_picoseconds: None,
891        };
892
893        let mut v2 = DataValue {
894            value: Some(Variant::Double(10f64)),
895            status: None,
896            source_timestamp: None,
897            source_picoseconds: None,
898            server_timestamp: None,
899            server_picoseconds: None,
900        };
901
902        // Values are the same so deadband should not matter
903        assert!(!filter.is_changed(&v1, &v2));
904
905        // Adjust by less than deadband
906        v2.value = Some(Variant::Double(10.9f64));
907        assert!(!filter.is_changed(&v1, &v2));
908
909        // Adjust by equal deadband
910        v2.value = Some(Variant::Double(11f64));
911        assert!(!filter.is_changed(&v1, &v2));
912
913        // Adjust by equal deadband plus a little bit
914        v2.value = Some(Variant::Double(11.00001f64));
915        assert!(filter.is_changed(&v1, &v2));
916    }
917
918    #[test]
919    fn monitored_item_filter() {
920        let start = Utc::now();
921        let mut item = new_monitored_item(
922            1,
923            ReadValueId {
924                node_id: NodeId::null(),
925                attribute_id: AttributeId::Value as u32,
926                ..Default::default()
927            },
928            MonitoringMode::Reporting,
929            FilterType::DataChangeFilter(ParsedDataChangeFilter {
930                trigger: DataChangeTrigger::StatusValue,
931                // Abs compare
932                deadband: Deadband::Absolute(0.9),
933            }),
934            SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
935            true,
936            Some(DataValue::new_at(1.0, start.into())),
937        );
938
939        // Not within sampling interval
940        assert!(item.notify_data_value(
941            DataValue::new_at(
942                2.0,
943                (start + Duration::try_milliseconds(50).unwrap()).into()
944            ),
945            &start.into(),
946            false
947        ));
948        assert_eq!(1, item.notification_queue.len());
949        assert!(item.sample_skipped_data_value.is_some());
950        // In deadband
951        assert!(!item.notify_data_value(
952            DataValue::new_at(
953                1.5,
954                (start + Duration::try_milliseconds(100).unwrap()).into()
955            ),
956            &start.into(),
957            false
958        ));
959        // Sampling is disabled, don't notify anything.
960        item.set_monitoring_mode(MonitoringMode::Disabled);
961        assert!(!item.notify_data_value(
962            DataValue::new_at(
963                3.0,
964                (start + Duration::try_milliseconds(250).unwrap()).into()
965            ),
966            &start.into(),
967            false
968        ));
969        item.set_monitoring_mode(MonitoringMode::Reporting);
970        // Ok
971        assert!(item.notify_data_value(
972            DataValue::new_at(
973                2.0,
974                (start + Duration::try_milliseconds(100).unwrap()).into()
975            ),
976            &start.into(),
977            false
978        ));
979        // Now in deadband
980        assert!(!item.notify_data_value(
981            DataValue::new_at(
982                2.5,
983                (start + Duration::try_milliseconds(200).unwrap()).into()
984            ),
985            &start.into(),
986            false
987        ));
988        // And outside deadband
989        assert!(item.notify_data_value(
990            DataValue::new_at(
991                3.0,
992                (start + Duration::try_milliseconds(250).unwrap()).into()
993            ),
994            &start.into(),
995            false
996        ));
997        assert_eq!(item.notification_queue.len(), 3);
998    }
999
1000    #[test]
1001    fn monitored_item_overflow() {
1002        let start = Utc::now();
1003        let mut item = new_monitored_item(
1004            1,
1005            ReadValueId {
1006                node_id: NodeId::null(),
1007                attribute_id: AttributeId::Value as u32,
1008                ..Default::default()
1009            },
1010            MonitoringMode::Reporting,
1011            FilterType::None,
1012            SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
1013            true,
1014            Some(DataValue::new_at(0, start.into())),
1015        );
1016        let now = start.into();
1017        item.queue_size = 5;
1018        for i in 0..4 {
1019            assert!(item.notify_data_value(
1020                DataValue::new_at(
1021                    i as i32 + 1,
1022                    (start + Duration::try_milliseconds(100 * i + 100).unwrap()).into(),
1023                ),
1024                &now,
1025                false
1026            ));
1027        }
1028        assert_eq!(item.notification_queue.len(), 5);
1029
1030        assert!(item.notify_data_value(
1031            DataValue::new_at(5, (start + Duration::try_milliseconds(600).unwrap()).into(),),
1032            &now,
1033            false
1034        ));
1035
1036        assert_eq!(item.notification_queue.len(), 5);
1037        let items: Vec<_> = item.notification_queue.drain(..).collect();
1038        for (idx, notif) in items.iter().enumerate() {
1039            let Notification::MonitoredItemNotification(n) = notif else {
1040                panic!("Wrong notification type");
1041            };
1042            let Some(Variant::Int32(v)) = &n.value.value else {
1043                panic!("Wrong value type");
1044            };
1045            // Values should be 1, 2, 3, 4, 5, since the first value 0 was dropped.
1046            assert_eq!(*v, idx as i32 + 1);
1047            // Last status code should have the overflow flag set.
1048            if idx == 4 {
1049                assert_eq!(n.value.status, Some(StatusCode::Good.set_overflow(true)));
1050            } else {
1051                assert_eq!(n.value.status, Some(StatusCode::Good));
1052            }
1053        }
1054    }
1055
1056    #[test]
1057    fn monitored_item_delayed_sample() {
1058        let start = Utc::now();
1059        let mut item = new_monitored_item(
1060            1,
1061            ReadValueId {
1062                node_id: NodeId::null(),
1063                attribute_id: AttributeId::Value as u32,
1064                ..Default::default()
1065            },
1066            MonitoringMode::Reporting,
1067            FilterType::None,
1068            SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
1069            true,
1070            Some(DataValue::new_at(0, start.into())),
1071        );
1072        let t = start + TimeDelta::milliseconds(50);
1073        // Skipped due to sampling interval
1074        assert!(item.notify_data_value(DataValue::new_at(1, t.into()), &t.into(), false));
1075        assert_eq!(item.notification_queue.len(), 1);
1076        assert!(item.sample_skipped_data_value.is_some());
1077
1078        // Now, trigger a new notification after 100 milliseconds, we should delete the skipped value
1079        // and send the new value, since its timestamp is not after the next sample.
1080        // This is to avoid cases where we indefinitely delay notifications.
1081        let t = start + TimeDelta::milliseconds(100);
1082        assert!(item.notify_data_value(DataValue::new_at(2, t.into()), &start.into(), false));
1083        assert!(item.sample_skipped_data_value.is_none());
1084        assert_eq!(item.notification_queue.len(), 2);
1085        item.notification_queue.drain(..);
1086
1087        // Again, skip a value due to sampling interval
1088        let t = start + TimeDelta::milliseconds(150);
1089        assert!(item.notify_data_value(DataValue::new_at(3, t.into()), &t.into(), false));
1090        assert_eq!(item.notification_queue.len(), 0);
1091        assert!(item.sample_skipped_data_value.is_some());
1092
1093        // This time, enqueue a new value far enough in the future that there is "room" for the skipped value.
1094        let t = start + TimeDelta::milliseconds(300);
1095        assert!(item.notify_data_value(DataValue::new_at(4, t.into()), &t.into(), false));
1096        assert!(item.sample_skipped_data_value.is_none());
1097        assert_eq!(item.notification_queue.len(), 2);
1098
1099        item.notification_queue.drain(..);
1100        // A skipped value should also be enqueued on tick.
1101        let t = start + TimeDelta::milliseconds(350);
1102        assert!(item.notify_data_value(DataValue::new_at(5, t.into()), &t.into(), false));
1103        assert!(item.sample_skipped_data_value.is_some());
1104        assert_eq!(item.notification_queue.len(), 0);
1105
1106        let t = start + TimeDelta::milliseconds(400);
1107        // If we tick the item, we should get the skipped value.
1108        assert!(item.maybe_enqueue_skipped_value(&t.into()));
1109        assert_eq!(item.notification_queue.len(), 1);
1110        assert!(item.sample_skipped_data_value.is_none());
1111    }
1112}