use std::collections::{BTreeSet, VecDeque};
use chrono::TimeDelta;
use opcua_nodes::{Event, ParsedEventFilter, TypeTree};
use tracing::{error, warn};
use super::MonitoredItemHandle;
use crate::{info::ServerInfo, node_manager::ParsedReadValueId};
use opcua_types::{
match_extension_object_owned, DataChangeFilter, DataValue, DateTime, EventFieldList,
EventFilter, EventFilterResult, ExtensionObject, MonitoredItemCreateRequest,
MonitoredItemModifyRequest, MonitoredItemNotification, MonitoringMode, NumericRange,
ParsedDataChangeFilter, StatusCode, TimestampsToReturn, Variant,
};
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum Notification {
MonitoredItemNotification(MonitoredItemNotification),
Event(EventFieldList),
}
impl From<MonitoredItemNotification> for Notification {
fn from(v: MonitoredItemNotification) -> Self {
Notification::MonitoredItemNotification(v)
}
}
impl From<EventFieldList> for Notification {
fn from(v: EventFieldList) -> Self {
Notification::Event(v)
}
}
fn parse_sampling_interval(int: f64) -> SamplingInterval {
match int {
..0.0 => SamplingInterval::Subscription,
0.0 => SamplingInterval::Zero,
_ => {
if !int.is_finite() {
warn!("Invalid sampling interval {}, using maximum", int);
return SamplingInterval::NonZero(TimeDelta::MAX);
}
let raw = int * 1000.0;
SamplingInterval::NonZero(if raw > i64::MAX as f64 {
TimeDelta::MAX
} else {
TimeDelta::microseconds(raw as i64)
})
}
}
}
#[derive(Debug, Clone)]
pub enum FilterType {
None,
DataChangeFilter(ParsedDataChangeFilter),
EventFilter(ParsedEventFilter),
}
impl FilterType {
pub fn from_filter(
filter: ExtensionObject,
eu_range: Option<(f64, f64)>,
type_tree: &dyn TypeTree,
) -> (Option<EventFilterResult>, Result<FilterType, StatusCode>) {
if filter.is_null() {
return (None, Ok(FilterType::None));
}
match_extension_object_owned!(filter,
v: DataChangeFilter => {
let res = ParsedDataChangeFilter::parse(v, eu_range);
(None, res.map(FilterType::DataChangeFilter))
},
v: EventFilter => {
let (res, filter_res) = ParsedEventFilter::new(v, type_tree);
(Some(res), filter_res.map(FilterType::EventFilter))
},
_ => {
error!(
"Requested data filter type is not supported: {}",
filter
.body
.as_ref()
.map(|b| b.type_name())
.unwrap_or("Unknown")
);
(None, Err(StatusCode::BadFilterNotAllowed))
}
)
}
}
#[derive(Debug)]
pub struct CreateMonitoredItem {
id: u32,
subscription_id: u32,
item_to_monitor: ParsedReadValueId,
monitoring_mode: MonitoringMode,
client_handle: u32,
discard_oldest: bool,
queue_size: usize,
sampling_interval: f64,
initial_value: Option<DataValue>,
status_code: StatusCode,
filter: FilterType,
filter_res: Option<EventFilterResult>,
timestamps_to_return: TimestampsToReturn,
eu_range: Option<(f64, f64)>,
}
fn sanitize_sampling_interval(info: &ServerInfo, requested_sampling_interval: f64) -> f64 {
if requested_sampling_interval < 0.0 || !requested_sampling_interval.is_finite() {
-1.0
} else if requested_sampling_interval == 0.0
|| requested_sampling_interval < info.config.limits.subscriptions.min_sampling_interval_ms
{
info.config.limits.subscriptions.min_sampling_interval_ms
} else {
requested_sampling_interval
}
}
fn sanitize_queue_size(info: &ServerInfo, requested_queue_size: usize) -> usize {
if requested_queue_size == 0 || requested_queue_size == 1 {
1
} else if requested_queue_size
> info
.config
.limits
.subscriptions
.max_monitored_item_queue_size
{
info.config
.limits
.subscriptions
.max_monitored_item_queue_size
} else {
requested_queue_size
}
}
impl CreateMonitoredItem {
pub(crate) fn new(
req: MonitoredItemCreateRequest,
id: u32,
sub_id: u32,
info: &ServerInfo,
timestamps_to_return: TimestampsToReturn,
type_tree: &dyn TypeTree,
eu_range: Option<(f64, f64)>,
) -> Self {
let (filter_res, filter) =
FilterType::from_filter(req.requested_parameters.filter, eu_range, type_tree);
let sampling_interval =
sanitize_sampling_interval(info, req.requested_parameters.sampling_interval);
let queue_size = sanitize_queue_size(info, req.requested_parameters.queue_size as usize);
let (filter, mut status) = match filter {
Ok(s) => (s, StatusCode::BadNodeIdUnknown),
Err(e) => (FilterType::None, e),
};
let item_to_monitor = match ParsedReadValueId::parse(req.item_to_monitor) {
Ok(r) => r,
Err(e) => {
status = e;
ParsedReadValueId::null()
}
};
Self {
id,
subscription_id: sub_id,
item_to_monitor,
monitoring_mode: req.monitoring_mode,
client_handle: req.requested_parameters.client_handle,
discard_oldest: req.requested_parameters.discard_oldest,
queue_size,
sampling_interval,
initial_value: None,
status_code: status,
filter,
timestamps_to_return,
filter_res,
eu_range,
}
}
pub fn handle(&self) -> MonitoredItemHandle {
MonitoredItemHandle {
monitored_item_id: self.id,
subscription_id: self.subscription_id,
}
}
pub fn set_initial_value(&mut self, value: DataValue) {
self.initial_value = Some(value);
}
pub fn set_status(&mut self, status: StatusCode) {
self.status_code = status;
}
pub fn item_to_monitor(&self) -> &ParsedReadValueId {
&self.item_to_monitor
}
pub fn monitoring_mode(&self) -> MonitoringMode {
self.monitoring_mode
}
pub fn sampling_interval(&self) -> f64 {
self.sampling_interval
}
pub fn queue_size(&self) -> usize {
self.queue_size
}
pub fn filter(&self) -> &FilterType {
&self.filter
}
pub fn revise_queue_size(&mut self, queue_size: usize) {
if queue_size < self.queue_size && queue_size > 0 || self.queue_size == 0 {
self.queue_size = queue_size;
}
}
pub fn revise_sampling_interval(&mut self, sampling_interval: f64) {
if sampling_interval < self.sampling_interval && sampling_interval > 0.0
|| self.sampling_interval == 0.0
{
self.sampling_interval = sampling_interval;
}
}
pub fn timestamps_to_return(&self) -> TimestampsToReturn {
self.timestamps_to_return
}
pub fn status_code(&self) -> StatusCode {
self.status_code
}
pub(crate) fn filter_res(&self) -> Option<&EventFilterResult> {
self.filter_res.as_ref()
}
}
#[derive(Debug)]
pub struct MonitoredItem {
id: u32,
item_to_monitor: ParsedReadValueId,
monitoring_mode: MonitoringMode,
triggered_items: BTreeSet<u32>,
client_handle: u32,
sampling_interval: SamplingInterval,
filter: FilterType,
discard_oldest: bool,
queue_size: usize,
notification_queue: VecDeque<Notification>,
queue_overflow: bool,
timestamps_to_return: TimestampsToReturn,
last_data_value: Option<DataValue>,
sample_skipped_data_value: Option<DataValue>,
any_new_notification: bool,
eu_range: Option<(f64, f64)>,
}
#[derive(Debug)]
pub(super) enum SamplingInterval {
Subscription,
Zero,
NonZero(TimeDelta),
}
impl MonitoredItem {
pub(super) fn new(request: &CreateMonitoredItem) -> Self {
let mut v = Self {
id: request.id,
item_to_monitor: request.item_to_monitor.clone(),
monitoring_mode: request.monitoring_mode,
triggered_items: BTreeSet::new(),
client_handle: request.client_handle,
sampling_interval: parse_sampling_interval(request.sampling_interval),
filter: request.filter.clone(),
discard_oldest: request.discard_oldest,
timestamps_to_return: request.timestamps_to_return,
last_data_value: None,
sample_skipped_data_value: None,
queue_size: request.queue_size,
notification_queue: VecDeque::new(),
queue_overflow: false,
any_new_notification: false,
eu_range: request.eu_range,
};
let now = DateTime::now();
if let Some(val) = request.initial_value.as_ref() {
v.notify_data_value(val.clone(), &now, true);
} else {
v.notify_data_value(
DataValue {
value: Some(Variant::Empty),
status: Some(StatusCode::BadWaitingForInitialData),
source_timestamp: Some(now),
source_picoseconds: None,
server_timestamp: Some(now),
server_picoseconds: None,
},
&now,
true,
);
}
v
}
pub(super) fn modify(
&mut self,
info: &ServerInfo,
timestamps_to_return: TimestampsToReturn,
request: &MonitoredItemModifyRequest,
type_tree: &dyn TypeTree,
) -> (Option<EventFilterResult>, StatusCode) {
self.timestamps_to_return = timestamps_to_return;
let (filter_res, filter) = FilterType::from_filter(
request.requested_parameters.filter.clone(),
self.eu_range,
type_tree,
);
self.filter = match filter {
Ok(f) => f,
Err(e) => return (filter_res, e),
};
let parsed_sampling_interval =
sanitize_sampling_interval(info, request.requested_parameters.sampling_interval);
self.sampling_interval = parse_sampling_interval(parsed_sampling_interval);
self.queue_size =
sanitize_queue_size(info, request.requested_parameters.queue_size as usize);
self.client_handle = request.requested_parameters.client_handle;
self.discard_oldest = request.requested_parameters.discard_oldest;
if self.notification_queue.len() > self.queue_size {
let discard = self.notification_queue.len() - self.queue_size;
for _ in 0..discard {
if self.discard_oldest {
let _ = self.notification_queue.pop_back();
} else {
let _ = self.notification_queue.pop_front();
}
}
self.notification_queue.shrink_to_fit();
}
(filter_res, StatusCode::Good)
}
fn filter_by_sampling_interval(
&self,
old: &DataValue,
new: &DataValue,
from_subscription_tick: bool,
) -> bool {
let (Some(old), Some(new)) = (&old.source_timestamp, &new.source_timestamp) else {
return true;
};
let elapsed = new.as_chrono().signed_duration_since(old.as_chrono());
match self.sampling_interval {
SamplingInterval::Subscription => {
from_subscription_tick
}
SamplingInterval::Zero => true,
SamplingInterval::NonZero(interval) => {
elapsed >= interval
}
}
}
pub(super) fn maybe_enqueue_skipped_value(&mut self, now: &DateTime) -> bool {
match self.sample_skipped_data_value.take() {
Some(value) if value.source_timestamp.is_some_and(|v| v <= *now) => {
self.notify_data_value(value, now, true);
true
}
Some(value) => {
self.sample_skipped_data_value = Some(value);
false
}
None => false,
}
}
pub(super) fn notify_data_value(
&mut self,
mut value: DataValue,
now: &DateTime,
from_subscription_tick: bool,
) -> bool {
if self.monitoring_mode == MonitoringMode::Disabled {
return false;
}
let mut extra_enqueued = false;
if let Some(skipped_value) = self.sample_skipped_data_value.take() {
if skipped_value
.source_timestamp
.is_some_and(|v| v <= *now && value.source_timestamp.is_none_or(|v2| v2 >= v))
{
extra_enqueued = self.notify_data_value(skipped_value, now, false);
}
}
if !matches!(self.item_to_monitor.index_range, NumericRange::None) {
if let Some(v) = value.value {
match v.range_of(&self.item_to_monitor.index_range) {
Ok(r) => value.value = Some(r),
Err(e) => {
value.status = Some(e);
value.value = Some(Variant::Empty);
}
}
}
}
let (matches_filter, matches_sampling_interval) =
match (&self.last_data_value, &self.filter) {
(Some(last_dv), FilterType::DataChangeFilter(filter)) => (
filter.is_changed(&value, last_dv),
self.filter_by_sampling_interval(last_dv, &value, from_subscription_tick),
),
(Some(last_dv), FilterType::None) => (
value.value != last_dv.value,
self.filter_by_sampling_interval(last_dv, &value, from_subscription_tick),
),
(None, _) => (true, true),
_ => (false, false),
};
if !matches_filter {
return extra_enqueued;
}
if !matches_sampling_interval {
value.source_timestamp = self
.last_data_value
.as_ref()
.and_then(|dv| dv.source_timestamp)
.unwrap_or(*now)
.as_chrono()
.checked_add_signed(self.sampling_interval_as_time_delta())
.map(|v| v.into());
self.sample_skipped_data_value = Some(value);
return true;
}
self.last_data_value = Some(value.clone());
match self.timestamps_to_return {
TimestampsToReturn::Neither | TimestampsToReturn::Invalid => {
value.source_timestamp = None;
value.source_picoseconds = None;
value.server_timestamp = None;
value.server_picoseconds = None
}
TimestampsToReturn::Server => {
value.source_timestamp = None;
value.source_picoseconds = None;
}
TimestampsToReturn::Source => {
value.server_timestamp = None;
value.server_picoseconds = None
}
TimestampsToReturn::Both => {
}
}
let client_handle = self.client_handle;
self.enqueue_notification(MonitoredItemNotification {
client_handle,
value,
});
true
}
pub(super) fn notify_event(&mut self, event: &dyn Event, type_tree: &dyn TypeTree) -> bool {
if self.monitoring_mode == MonitoringMode::Disabled {
return false;
}
let FilterType::EventFilter(filter) = &self.filter else {
return false;
};
let Some(notif) = filter.evaluate(event, self.client_handle, type_tree) else {
return false;
};
self.enqueue_notification(notif);
true
}
fn enqueue_notification(&mut self, notification: impl Into<Notification>) {
self.any_new_notification = true;
let overflow = self.notification_queue.len() == self.queue_size;
if overflow {
if self.discard_oldest {
self.notification_queue.pop_front();
} else {
self.notification_queue.pop_back();
}
}
let mut notification = notification.into();
if overflow {
if let Notification::MonitoredItemNotification(n) = &mut notification {
n.value.status = Some(n.value.status().set_overflow(true));
}
self.queue_overflow = true;
}
self.notification_queue.push_back(notification);
}
pub(super) fn add_current_value_to_queue(&mut self) {
let last_value = self.notification_queue.front();
if let Some(Notification::MonitoredItemNotification(it)) = last_value {
if Some(&it.value) == self.last_data_value.as_ref() {
return;
}
}
let Some(value) = self.last_data_value.as_ref() else {
return;
};
self.enqueue_notification(Notification::MonitoredItemNotification(
MonitoredItemNotification {
client_handle: self.client_handle,
value: value.clone(),
},
));
}
pub fn has_last_value(&self) -> bool {
self.last_data_value.is_some()
}
pub(super) fn has_new_notifications(&mut self) -> bool {
let any_new = self.any_new_notification;
self.any_new_notification = false;
any_new
}
pub(super) fn pop_notification(&mut self) -> Option<Notification> {
self.notification_queue.pop_front()
}
pub(super) fn set_triggering(&mut self, items_to_add: &[u32], items_to_remove: &[u32]) {
items_to_remove.iter().for_each(|i| {
self.triggered_items.remove(i);
});
items_to_add.iter().for_each(|i| {
self.triggered_items.insert(*i);
});
}
pub(super) fn remove_dead_trigger(&mut self, id: u32) {
self.triggered_items.remove(&id);
}
pub fn is_reporting(&self) -> bool {
matches!(self.monitoring_mode, MonitoringMode::Reporting)
}
pub fn is_sampling(&self) -> bool {
matches!(
self.monitoring_mode,
MonitoringMode::Reporting | MonitoringMode::Sampling
)
}
pub fn triggered_items(&self) -> &BTreeSet<u32> {
&self.triggered_items
}
pub fn has_notifications(&self) -> bool {
!self.notification_queue.is_empty()
}
pub fn id(&self) -> u32 {
self.id
}
pub fn sampling_interval(&self) -> f64 {
match &self.sampling_interval {
SamplingInterval::Subscription => -1.0,
SamplingInterval::Zero => 0.0,
SamplingInterval::NonZero(time_delta) => time_delta
.num_microseconds()
.map(|v| v as f64 / 1000.0)
.unwrap_or(time_delta.num_milliseconds() as f64),
}
}
pub fn sampling_interval_as_time_delta(&self) -> TimeDelta {
match &self.sampling_interval {
SamplingInterval::Subscription => TimeDelta::zero(),
SamplingInterval::Zero => TimeDelta::zero(),
SamplingInterval::NonZero(time_delta) => *time_delta,
}
}
pub fn queue_size(&self) -> usize {
self.queue_size
}
pub fn item_to_monitor(&self) -> &ParsedReadValueId {
&self.item_to_monitor
}
pub(super) fn set_monitoring_mode(&mut self, monitoring_mode: MonitoringMode) {
self.monitoring_mode = monitoring_mode;
}
pub fn monitoring_mode(&self) -> MonitoringMode {
self.monitoring_mode
}
pub fn discard_oldest(&self) -> bool {
self.discard_oldest
}
pub fn client_handle(&self) -> u32 {
self.client_handle
}
}
#[cfg(test)]
pub(super) mod tests {
use chrono::{Duration, TimeDelta, Utc};
use crate::{
node_manager::ParsedReadValueId,
subscriptions::monitored_item::{Notification, SamplingInterval},
};
use opcua_types::{
AttributeId, DataChangeFilter, DataChangeTrigger, DataValue, DateTime, Deadband,
DeadbandType, MonitoringMode, NodeId, ParsedDataChangeFilter, ReadValueId, StatusCode,
Variant,
};
use super::{FilterType, MonitoredItem};
pub(crate) fn new_monitored_item(
id: u32,
item_to_monitor: ReadValueId,
monitoring_mode: MonitoringMode,
filter: FilterType,
sampling_interval: SamplingInterval,
discard_oldest: bool,
initial_value: Option<DataValue>,
) -> MonitoredItem {
let mut v = MonitoredItem {
id,
item_to_monitor: ParsedReadValueId::parse(item_to_monitor).unwrap(),
monitoring_mode,
triggered_items: Default::default(),
client_handle: Default::default(),
sampling_interval,
filter,
discard_oldest,
queue_size: 10,
notification_queue: Default::default(),
queue_overflow: false,
timestamps_to_return: opcua_types::TimestampsToReturn::Both,
last_data_value: None,
sample_skipped_data_value: None,
any_new_notification: false,
eu_range: None,
};
let now = DateTime::now();
if let Some(val) = initial_value {
v.notify_data_value(val, &now, true);
} else {
let now = DateTime::now();
v.notify_data_value(
DataValue {
value: Some(Variant::Empty),
status: Some(StatusCode::BadWaitingForInitialData),
source_timestamp: Some(now),
source_picoseconds: None,
server_timestamp: Some(now),
server_picoseconds: None,
},
&now,
true,
);
}
v
}
#[test]
fn data_change_filter() {
let filter = DataChangeFilter {
trigger: DataChangeTrigger::Status,
deadband_type: DeadbandType::None as u32,
deadband_value: 0f64,
};
let mut filter = ParsedDataChangeFilter::parse(filter, None).unwrap();
let mut v1 = DataValue {
value: None,
status: None,
source_timestamp: None,
source_picoseconds: None,
server_timestamp: None,
server_picoseconds: None,
};
let mut v2 = DataValue {
value: None,
status: None,
source_timestamp: None,
source_picoseconds: None,
server_timestamp: None,
server_picoseconds: None,
};
assert!(!filter.is_changed(&v1, &v2));
v1.status = Some(StatusCode::Good);
assert!(filter.is_changed(&v1, &v2));
v2.status = Some(StatusCode::Good);
assert!(!filter.is_changed(&v1, &v2));
v1.value = Some(Variant::Boolean(true));
assert!(!filter.is_changed(&v1, &v2));
filter.trigger = DataChangeTrigger::StatusValue;
assert!(filter.is_changed(&v1, &v2));
v2.value = Some(Variant::Boolean(true));
assert!(!filter.is_changed(&v1, &v2));
filter.trigger = DataChangeTrigger::StatusValueTimestamp;
assert!(!filter.is_changed(&v1, &v2));
let now = DateTime::now();
v1.source_timestamp = Some(now);
assert!(filter.is_changed(&v1, &v2));
}
#[test]
fn data_change_deadband_abs() {
let filter = DataChangeFilter {
trigger: DataChangeTrigger::StatusValue,
deadband_type: DeadbandType::Absolute as u32,
deadband_value: 1f64,
};
let filter = ParsedDataChangeFilter::parse(filter, None).unwrap();
let v1 = DataValue {
value: Some(Variant::Double(10f64)),
status: None,
source_timestamp: None,
source_picoseconds: None,
server_timestamp: None,
server_picoseconds: None,
};
let mut v2 = DataValue {
value: Some(Variant::Double(10f64)),
status: None,
source_timestamp: None,
source_picoseconds: None,
server_timestamp: None,
server_picoseconds: None,
};
assert!(!filter.is_changed(&v1, &v2));
v2.value = Some(Variant::Double(10.9f64));
assert!(!filter.is_changed(&v1, &v2));
v2.value = Some(Variant::Double(11f64));
assert!(!filter.is_changed(&v1, &v2));
v2.value = Some(Variant::Double(11.00001f64));
assert!(filter.is_changed(&v1, &v2));
}
#[test]
fn monitored_item_filter() {
let start = Utc::now();
let mut item = new_monitored_item(
1,
ReadValueId {
node_id: NodeId::null(),
attribute_id: AttributeId::Value as u32,
..Default::default()
},
MonitoringMode::Reporting,
FilterType::DataChangeFilter(ParsedDataChangeFilter {
trigger: DataChangeTrigger::StatusValue,
deadband: Deadband::Absolute(0.9),
}),
SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
true,
Some(DataValue::new_at(1.0, start.into())),
);
assert!(item.notify_data_value(
DataValue::new_at(
2.0,
(start + Duration::try_milliseconds(50).unwrap()).into()
),
&start.into(),
false
));
assert_eq!(1, item.notification_queue.len());
assert!(item.sample_skipped_data_value.is_some());
assert!(!item.notify_data_value(
DataValue::new_at(
1.5,
(start + Duration::try_milliseconds(100).unwrap()).into()
),
&start.into(),
false
));
item.set_monitoring_mode(MonitoringMode::Disabled);
assert!(!item.notify_data_value(
DataValue::new_at(
3.0,
(start + Duration::try_milliseconds(250).unwrap()).into()
),
&start.into(),
false
));
item.set_monitoring_mode(MonitoringMode::Reporting);
assert!(item.notify_data_value(
DataValue::new_at(
2.0,
(start + Duration::try_milliseconds(100).unwrap()).into()
),
&start.into(),
false
));
assert!(!item.notify_data_value(
DataValue::new_at(
2.5,
(start + Duration::try_milliseconds(200).unwrap()).into()
),
&start.into(),
false
));
assert!(item.notify_data_value(
DataValue::new_at(
3.0,
(start + Duration::try_milliseconds(250).unwrap()).into()
),
&start.into(),
false
));
assert_eq!(item.notification_queue.len(), 3);
}
#[test]
fn monitored_item_overflow() {
let start = Utc::now();
let mut item = new_monitored_item(
1,
ReadValueId {
node_id: NodeId::null(),
attribute_id: AttributeId::Value as u32,
..Default::default()
},
MonitoringMode::Reporting,
FilterType::None,
SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
true,
Some(DataValue::new_at(0, start.into())),
);
let now = start.into();
item.queue_size = 5;
for i in 0..4 {
assert!(item.notify_data_value(
DataValue::new_at(
i as i32 + 1,
(start + Duration::try_milliseconds(100 * i + 100).unwrap()).into(),
),
&now,
false
));
}
assert_eq!(item.notification_queue.len(), 5);
assert!(item.notify_data_value(
DataValue::new_at(5, (start + Duration::try_milliseconds(600).unwrap()).into(),),
&now,
false
));
assert_eq!(item.notification_queue.len(), 5);
let items: Vec<_> = item.notification_queue.drain(..).collect();
for (idx, notif) in items.iter().enumerate() {
let Notification::MonitoredItemNotification(n) = notif else {
panic!("Wrong notification type");
};
let Some(Variant::Int32(v)) = &n.value.value else {
panic!("Wrong value type");
};
assert_eq!(*v, idx as i32 + 1);
if idx == 4 {
assert_eq!(n.value.status, Some(StatusCode::Good.set_overflow(true)));
} else {
assert_eq!(n.value.status, Some(StatusCode::Good));
}
}
}
#[test]
fn monitored_item_delayed_sample() {
let start = Utc::now();
let mut item = new_monitored_item(
1,
ReadValueId {
node_id: NodeId::null(),
attribute_id: AttributeId::Value as u32,
..Default::default()
},
MonitoringMode::Reporting,
FilterType::None,
SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
true,
Some(DataValue::new_at(0, start.into())),
);
let t = start + TimeDelta::milliseconds(50);
assert!(item.notify_data_value(DataValue::new_at(1, t.into()), &t.into(), false));
assert_eq!(item.notification_queue.len(), 1);
assert!(item.sample_skipped_data_value.is_some());
let t = start + TimeDelta::milliseconds(100);
assert!(item.notify_data_value(DataValue::new_at(2, t.into()), &start.into(), false));
assert!(item.sample_skipped_data_value.is_none());
assert_eq!(item.notification_queue.len(), 2);
item.notification_queue.drain(..);
let t = start + TimeDelta::milliseconds(150);
assert!(item.notify_data_value(DataValue::new_at(3, t.into()), &t.into(), false));
assert_eq!(item.notification_queue.len(), 0);
assert!(item.sample_skipped_data_value.is_some());
let t = start + TimeDelta::milliseconds(300);
assert!(item.notify_data_value(DataValue::new_at(4, t.into()), &t.into(), false));
assert!(item.sample_skipped_data_value.is_none());
assert_eq!(item.notification_queue.len(), 2);
item.notification_queue.drain(..);
let t = start + TimeDelta::milliseconds(350);
assert!(item.notify_data_value(DataValue::new_at(5, t.into()), &t.into(), false));
assert!(item.sample_skipped_data_value.is_some());
assert_eq!(item.notification_queue.len(), 0);
let t = start + TimeDelta::milliseconds(400);
assert!(item.maybe_enqueue_skipped_value(&t.into()));
assert_eq!(item.notification_queue.len(), 1);
assert!(item.sample_skipped_data_value.is_none());
}
}