use std::collections::{HashMap, BTreeSet, VecDeque};
use std::sync::{Arc, RwLock};
use chrono;
use opcua_types::{
*,
status_code::StatusCode,
service_types::{
TimestampsToReturn, NotificationMessage, MonitoredItemCreateRequest, MonitoredItemCreateResult, MonitoredItemModifyRequest, MonitoredItemModifyResult,
},
};
use opcua_core::handle::Handle;
use crate::{
constants,
subscriptions::monitored_item::{MonitoredItem, TickResult, Notification},
address_space::AddressSpace,
diagnostics::ServerDiagnostics,
};
#[derive(Debug, Copy, Clone, PartialEq, Serialize)]
pub(crate) enum SubscriptionState {
Closed,
Creating,
Normal,
Late,
KeepAlive,
}
#[derive(Debug)]
pub(crate) struct SubscriptionStateParams {
pub notifications_available: bool,
pub more_notifications: bool,
pub publishing_req_queued: bool,
pub publishing_timer_expired: bool,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum UpdateStateAction {
None,
ReturnKeepAlive,
ReturnNotifications,
SubscriptionCreated,
SubscriptionExpired,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) enum HandledState {
None0 = 0,
Create3 = 3,
Normal4 = 4,
Normal5 = 5,
IntervalElapsed6 = 6,
IntervalElapsed7 = 7,
IntervalElapsed8 = 8,
IntervalElapsed9 = 9,
Late10 = 10,
Late11 = 11,
Late12 = 12,
KeepAlive13 = 13,
KeepAlive14 = 14,
KeepAlive15 = 15,
KeepAlive16 = 16,
KeepAlive17 = 17,
Closed27 = 27,
}
#[derive(Debug)]
pub(crate) struct UpdateStateResult {
pub handled_state: HandledState,
pub update_state_action: UpdateStateAction,
}
impl UpdateStateResult {
pub fn new(handled_state: HandledState, update_state_action: UpdateStateAction) -> UpdateStateResult {
UpdateStateResult {
handled_state,
update_state_action,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) enum TickReason {
ReceivePublishRequest,
TickTimerFired,
}
#[derive(Debug, Clone, Serialize)]
pub struct Subscription {
subscription_id: u32,
publishing_interval: Duration,
max_lifetime_counter: u32,
max_keep_alive_counter: u32,
priority: u8,
monitored_items: HashMap<u32, MonitoredItem>,
state: SubscriptionState,
lifetime_counter: u32,
keep_alive_counter: u32,
first_message_sent: bool,
publishing_enabled: bool,
resend_data: bool,
sequence_number: Handle,
last_sequence_number: u32,
next_monitored_item_id: u32,
last_time_publishing_interval_elapsed: DateTimeUtc,
#[serde(skip)]
notifications: VecDeque<NotificationMessage>,
#[serde(skip)]
diagnostics: Arc<RwLock<ServerDiagnostics>>,
#[serde(skip)]
diagnostics_on_drop: bool,
}
impl Drop for Subscription {
fn drop(&mut self) {
if self.diagnostics_on_drop {
let mut diagnostics = trace_write_lock_unwrap!(self.diagnostics);
diagnostics.on_destroy_subscription(self);
}
}
}
impl Subscription {
pub fn new(diagnostics: Arc<RwLock<ServerDiagnostics>>, subscription_id: u32, publishing_enabled: bool, publishing_interval: Duration, lifetime_counter: u32, keep_alive_counter: u32, priority: u8) -> Subscription {
let subscription = Subscription {
subscription_id,
publishing_interval,
priority,
monitored_items: HashMap::with_capacity(constants::DEFAULT_MONITORED_ITEM_CAPACITY),
max_lifetime_counter: lifetime_counter,
max_keep_alive_counter: keep_alive_counter,
state: SubscriptionState::Creating,
lifetime_counter,
keep_alive_counter,
first_message_sent: false,
publishing_enabled,
resend_data: false,
sequence_number: Handle::new(1),
last_sequence_number: 0,
next_monitored_item_id: 1,
last_time_publishing_interval_elapsed: chrono::Utc::now(),
notifications: VecDeque::with_capacity(100),
diagnostics,
diagnostics_on_drop: true,
};
{
let mut diagnostics = trace_write_lock_unwrap!(subscription.diagnostics);
diagnostics.on_create_subscription(&subscription);
}
subscription
}
pub(crate) fn ready_to_remove(&self) -> bool {
self.state == SubscriptionState::Closed && self.notifications.is_empty()
}
fn monitored_item_create_error(status_code: StatusCode) -> MonitoredItemCreateResult {
MonitoredItemCreateResult {
status_code,
monitored_item_id: 0,
revised_sampling_interval: 0f64,
revised_queue_size: 0,
filter_result: ExtensionObject::null(),
}
}
pub fn monitored_items_len(&self) -> usize {
self.monitored_items.len()
}
pub fn create_monitored_items(&mut self, address_space: &AddressSpace, now: &DateTimeUtc, timestamps_to_return: TimestampsToReturn, items_to_create: &[MonitoredItemCreateRequest], max_monitored_items_per_sub: usize) -> Vec<MonitoredItemCreateResult> {
self.reset_lifetime_counter();
items_to_create.iter().map(|item_to_create| {
if !address_space.node_exists(&item_to_create.item_to_monitor.node_id) {
Self::monitored_item_create_error(StatusCode::BadNodeIdUnknown)
} else {
let monitored_item_id = self.next_monitored_item_id;
match MonitoredItem::new(now, monitored_item_id, timestamps_to_return, item_to_create) {
Ok(monitored_item) => {
if max_monitored_items_per_sub == 0 || self.monitored_items.len() <= max_monitored_items_per_sub {
let revised_sampling_interval = monitored_item.sampling_interval();
let revised_queue_size = monitored_item.queue_size() as u32;
match monitored_item.validate_filter(address_space) {
Ok(filter_result) => {
self.monitored_items.insert(monitored_item_id, monitored_item);
self.next_monitored_item_id += 1;
MonitoredItemCreateResult {
status_code: StatusCode::Good,
monitored_item_id,
revised_sampling_interval,
revised_queue_size,
filter_result,
}
}
Err(status_code) => Self::monitored_item_create_error(status_code)
}
} else {
Self::monitored_item_create_error(StatusCode::BadTooManyMonitoredItems)
}
}
Err(status_code) => Self::monitored_item_create_error(status_code)
}
}
}).collect()
}
pub fn modify_monitored_items(&mut self, address_space: &AddressSpace, timestamps_to_return: TimestampsToReturn, items_to_modify: &[MonitoredItemModifyRequest]) -> Vec<MonitoredItemModifyResult> {
self.reset_lifetime_counter();
items_to_modify.iter().map(|item_to_modify| {
match self.monitored_items.get_mut(&item_to_modify.monitored_item_id) {
Some(monitored_item) => {
let modify_result = monitored_item.modify(address_space, timestamps_to_return, item_to_modify);
match modify_result {
Ok(filter_result) => MonitoredItemModifyResult {
status_code: StatusCode::Good,
revised_sampling_interval: monitored_item.sampling_interval(),
revised_queue_size: monitored_item.queue_size() as u32,
filter_result,
},
Err(err) => MonitoredItemModifyResult {
status_code: err,
revised_sampling_interval: 0f64,
revised_queue_size: 0,
filter_result: ExtensionObject::null(),
}
}
}
None => MonitoredItemModifyResult {
status_code: StatusCode::BadMonitoredItemIdInvalid,
revised_sampling_interval: 0f64,
revised_queue_size: 0,
filter_result: ExtensionObject::null(),
}
}
}).collect()
}
pub fn set_monitoring_mode(&mut self, monitored_item_id: u32, monitoring_mode: MonitoringMode) -> StatusCode {
if let Some(monitored_item) = self.monitored_items.get_mut(&monitored_item_id) {
monitored_item.set_monitoring_mode(monitoring_mode);
StatusCode::Good
} else {
StatusCode::BadMonitoredItemIdInvalid
}
}
pub fn delete_monitored_items(&mut self, items_to_delete: &[u32]) -> Vec<StatusCode> {
self.reset_lifetime_counter();
items_to_delete.iter().map(|item_to_delete| {
match self.monitored_items.remove(item_to_delete) {
Some(_) => StatusCode::Good,
None => StatusCode::BadMonitoredItemIdInvalid
}
}).collect()
}
pub fn get_handles(&self) -> (Vec<u32>, Vec<u32>) {
let server_handles = self.monitored_items.values().map(|i| i.monitored_item_id()).collect();
let client_handles = self.monitored_items.values().map(|i| i.client_handle()).collect();
(server_handles, client_handles)
}
pub fn set_resend_data(&mut self) {
self.resend_data = true;
}
fn test_and_set_publishing_interval_elapsed(&mut self, now: &DateTimeUtc) -> bool {
let publishing_interval = super::duration_from_ms(self.publishing_interval);
let elapsed = now.signed_duration_since(self.last_time_publishing_interval_elapsed);
if elapsed >= publishing_interval {
self.last_time_publishing_interval_elapsed = *now;
true
} else {
false
}
}
pub(crate) fn tick(&mut self, now: &DateTimeUtc, address_space: &AddressSpace, tick_reason: TickReason, publishing_req_queued: bool) {
let publishing_interval_elapsed = match tick_reason {
TickReason::ReceivePublishRequest => {
false
}
TickReason::TickTimerFired => if self.state == SubscriptionState::Creating {
true
} else if self.publishing_interval <= 0f64 {
panic!("Publishing interval should have been revised to min interval")
} else {
self.test_and_set_publishing_interval_elapsed(now)
}
};
let notification = match self.state {
SubscriptionState::Closed | SubscriptionState::Creating => None,
_ => {
let resend_data = self.resend_data;
self.tick_monitored_items(now, address_space, publishing_interval_elapsed, resend_data)
}
};
self.resend_data = false;
let notifications_available = !self.notifications.is_empty() || notification.is_some();
let more_notifications = self.notifications.len() > 1;
if notifications_available || publishing_interval_elapsed || publishing_req_queued {
let update_state_result = self.update_state(tick_reason, SubscriptionStateParams {
publishing_req_queued,
notifications_available,
more_notifications,
publishing_timer_expired: publishing_interval_elapsed,
});
trace!("subscription tick - update_state_result = {:?}", update_state_result);
self.handle_state_result(now, update_state_result, notification);
}
}
fn enqueue_notification(&mut self, notification: NotificationMessage) {
use std::u32;
let expected_sequence_number = if self.last_sequence_number == u32::MAX { 1 } else { self.last_sequence_number + 1 };
if notification.sequence_number != expected_sequence_number {
panic!("Notification's sequence number is not sequential, expecting {}, got {}", expected_sequence_number, notification.sequence_number);
}
self.last_sequence_number = notification.sequence_number;
self.notifications.push_back(notification);
}
fn handle_state_result(&mut self, now: &DateTimeUtc, update_state_result: UpdateStateResult, notification: Option<NotificationMessage>) {
match update_state_result.update_state_action {
UpdateStateAction::None => {
if let Some(ref notification) = notification {
let notification_sequence_number = notification.sequence_number;
self.sequence_number.set_next(notification_sequence_number);
debug!("Notification message nr {} was being ignored for a do-nothing, update state was {:?}", notification_sequence_number, update_state_result);
}
}
UpdateStateAction::ReturnKeepAlive => {
if let Some(ref notification) = notification {
let notification_sequence_number = notification.sequence_number;
self.sequence_number.set_next(notification_sequence_number);
debug!("Notification message nr {} was being ignored for a keep alive, update state was {:?}", notification_sequence_number, update_state_result);
}
debug!("Sending keep alive response");
let notification = NotificationMessage::keep_alive(self.sequence_number.next(), DateTime::from(now.clone()));
self.enqueue_notification(notification);
}
UpdateStateAction::ReturnNotifications => {
if let Some(notification) = notification {
self.enqueue_notification(notification);
}
}
UpdateStateAction::SubscriptionCreated => {
if notification.is_some() {
panic!("SubscriptionCreated got a notification");
}
}
UpdateStateAction::SubscriptionExpired => {
if notification.is_some() {
panic!("SubscriptionExpired got a notification");
}
debug!("Subscription status change to closed / timeout");
self.monitored_items.clear();
let notification = NotificationMessage::status_change(self.sequence_number.next(), DateTime::from(now.clone()), StatusCode::BadTimeout);
self.enqueue_notification(notification);
}
}
}
pub(crate) fn take_notification(&mut self) -> Option<NotificationMessage> {
self.notifications.pop_front()
}
pub(crate) fn update_state(&mut self, tick_reason: TickReason, p: SubscriptionStateParams) -> UpdateStateResult {
if tick_reason == TickReason::ReceivePublishRequest && p.publishing_timer_expired {
panic!("Should not be possible for timer to have expired and received publish request at same time")
}
{
use log::Level::Trace;
if log_enabled!(Trace) {
trace!(r#"State inputs:
subscription_id: {} / state: {:?}
tick_reason: {:?} / state_params: {:?}
publishing_enabled: {}
keep_alive_counter / lifetime_counter: {} / {}
message_sent: {}"#,
self.subscription_id, self.state, tick_reason, p,
self.publishing_enabled,
self.keep_alive_counter,
self.lifetime_counter,
self.first_message_sent);
}
}
match self.state {
SubscriptionState::Normal | SubscriptionState::Late | SubscriptionState::KeepAlive => {
if self.lifetime_counter == 1 {
self.state = SubscriptionState::Closed;
return UpdateStateResult::new(HandledState::Closed27, UpdateStateAction::SubscriptionExpired);
}
}
_ => {
}
}
match self.state {
SubscriptionState::Creating => {
self.state = SubscriptionState::Normal;
self.first_message_sent = false;
return UpdateStateResult::new(HandledState::Create3, UpdateStateAction::SubscriptionCreated);
}
SubscriptionState::Normal => {
if tick_reason == TickReason::ReceivePublishRequest && (!self.publishing_enabled || (self.publishing_enabled && !p.more_notifications)) {
return UpdateStateResult::new(HandledState::Normal4, UpdateStateAction::None);
} else if tick_reason == TickReason::ReceivePublishRequest && self.publishing_enabled && p.more_notifications {
self.reset_lifetime_counter();
self.first_message_sent = true;
return UpdateStateResult::new(HandledState::Normal5, UpdateStateAction::ReturnNotifications);
} else if p.publishing_timer_expired && p.publishing_req_queued && self.publishing_enabled && p.notifications_available {
self.reset_lifetime_counter();
self.start_publishing_timer();
self.first_message_sent = true;
return UpdateStateResult::new(HandledState::IntervalElapsed6, UpdateStateAction::ReturnNotifications);
} else if p.publishing_timer_expired && p.publishing_req_queued && !self.first_message_sent && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available)) {
self.reset_lifetime_counter();
self.start_publishing_timer();
self.first_message_sent = true;
return UpdateStateResult::new(HandledState::IntervalElapsed7, UpdateStateAction::ReturnKeepAlive);
} else if p.publishing_timer_expired && !p.publishing_req_queued && (!self.first_message_sent || (self.publishing_enabled && p.notifications_available)) {
self.start_publishing_timer();
self.state = SubscriptionState::Late;
return UpdateStateResult::new(HandledState::IntervalElapsed8, UpdateStateAction::None);
} else if p.publishing_timer_expired && self.first_message_sent && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available)) {
self.start_publishing_timer();
self.reset_keep_alive_counter();
self.state = SubscriptionState::KeepAlive;
return UpdateStateResult::new(HandledState::IntervalElapsed9, UpdateStateAction::None);
}
}
SubscriptionState::Late => {
if tick_reason == TickReason::ReceivePublishRequest && self.publishing_enabled && (p.notifications_available || p.more_notifications) {
self.reset_lifetime_counter();
self.state = SubscriptionState::Normal;
self.first_message_sent = true;
return UpdateStateResult::new(HandledState::Late10, UpdateStateAction::ReturnNotifications);
} else if tick_reason == TickReason::ReceivePublishRequest && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available && !p.more_notifications)) {
self.reset_lifetime_counter();
self.state = SubscriptionState::KeepAlive;
self.first_message_sent = true;
return UpdateStateResult::new(HandledState::Late11, UpdateStateAction::ReturnKeepAlive);
} else if p.publishing_timer_expired {
self.start_publishing_timer();
return UpdateStateResult::new(HandledState::Late12, UpdateStateAction::None);
}
}
SubscriptionState::KeepAlive => {
if tick_reason == TickReason::ReceivePublishRequest {
return UpdateStateResult::new(HandledState::KeepAlive13, UpdateStateAction::None);
} else if p.publishing_timer_expired && self.publishing_enabled && p.notifications_available && p.publishing_req_queued {
self.first_message_sent = true;
self.state = SubscriptionState::Normal;
return UpdateStateResult::new(HandledState::KeepAlive14, UpdateStateAction::ReturnNotifications);
} else if p.publishing_timer_expired && p.publishing_req_queued && self.keep_alive_counter == 1 && (!self.publishing_enabled || (self.publishing_enabled && p.notifications_available)) {
self.start_publishing_timer();
self.reset_keep_alive_counter();
return UpdateStateResult::new(HandledState::KeepAlive15, UpdateStateAction::ReturnKeepAlive);
} else if p.publishing_timer_expired && self.keep_alive_counter > 1 && (!self.publishing_enabled || (self.publishing_enabled && !p.notifications_available)) {
self.start_publishing_timer();
self.keep_alive_counter -= 1;
return UpdateStateResult::new(HandledState::KeepAlive16, UpdateStateAction::None);
} else if p.publishing_timer_expired && !p.publishing_req_queued && (self.keep_alive_counter == 1 || (self.keep_alive_counter > 1 && self.publishing_enabled && p.notifications_available)) {
self.start_publishing_timer();
self.state = SubscriptionState::Late;
return UpdateStateResult::new(HandledState::KeepAlive17, UpdateStateAction::None);
}
}
_ => {
}
}
UpdateStateResult::new(HandledState::None0, UpdateStateAction::None)
}
fn tick_monitored_items(&mut self, now: &DateTimeUtc, address_space: &AddressSpace, publishing_interval_elapsed: bool, resend_data: bool) -> Option<NotificationMessage> {
let mut triggered_items: BTreeSet<u32> = BTreeSet::new();
let mut monitored_item_notifications = Vec::with_capacity(self.monitored_items.len() * 2);
for (_, monitored_item) in &mut self.monitored_items {
let monitoring_mode = monitored_item.monitoring_mode();
match monitored_item.tick(now, address_space, publishing_interval_elapsed, resend_data) {
TickResult::ReportValueChanged => {
if publishing_interval_elapsed {
match monitoring_mode {
MonitoringMode::Reporting => {
monitored_item.triggered_items().iter().for_each(|i| {
triggered_items.insert(*i);
})
}
_ => {
panic!("How can there be changes to report when monitored item is in this monitoring mode {:?}", monitoring_mode);
}
}
if let Some(mut item_notification_messages) = monitored_item.all_notifications() {
monitored_item_notifications.append(&mut item_notification_messages);
}
}
}
TickResult::ValueChanged => {
if publishing_interval_elapsed {
match monitoring_mode {
MonitoringMode::Sampling => {
monitored_item.triggered_items().iter().for_each(|i| {
triggered_items.insert(*i);
})
}
_ => {
panic!("How can there be a value change when the mode is not sampling?");
}
}
}
}
TickResult::NoChange => {
}
}
}
triggered_items.iter().for_each(|i| {
if let Some(ref mut monitored_item) = self.monitored_items.get_mut(i) {
match monitored_item.monitoring_mode() {
MonitoringMode::Sampling => {
monitored_item.check_value(address_space, now, true);
if let Some(mut notifications) = monitored_item.all_notifications() {
monitored_item_notifications.append(&mut notifications);
}
}
MonitoringMode::Reporting => {
}
MonitoringMode::Disabled => {
}
}
} else {
}
});
if !monitored_item_notifications.is_empty() {
let next_sequence_number = self.sequence_number.next();
trace!("Create notification for subscription {}, sequence number {}", self.subscription_id, next_sequence_number);
let data_change_notifications = monitored_item_notifications.iter()
.filter(|v| matches!(v, Notification::MonitoredItemNotification(_)))
.map(|v| if let Notification::MonitoredItemNotification(v) = v { v.clone() } else { panic!() })
.collect();
let event_notifications = monitored_item_notifications.iter()
.filter(|v| matches!(v, Notification::Event(_)))
.map(|v| if let Notification::Event(v) = v { v.clone() } else { panic!() })
.collect();
let notification = NotificationMessage::data_change(next_sequence_number, DateTime::from(now.clone()), data_change_notifications, event_notifications);
Some(notification)
} else {
None
}
}
pub fn reset_keep_alive_counter(&mut self) {
self.keep_alive_counter = self.max_keep_alive_counter;
}
pub fn reset_lifetime_counter(&mut self) {
self.lifetime_counter = self.max_lifetime_counter;
}
pub fn start_publishing_timer(&mut self) {
self.lifetime_counter -= 1;
trace!("Decrementing life time counter {}", self.lifetime_counter);
}
pub fn subscription_id(&self) -> u32 {
self.subscription_id
}
pub fn lifetime_counter(&self) -> u32 {
self.lifetime_counter
}
#[cfg(test)]
pub(crate) fn set_current_lifetime_count(&mut self, current_lifetime_count: u32) {
self.lifetime_counter = current_lifetime_count;
}
pub fn keep_alive_counter(&self) -> u32 {
self.keep_alive_counter
}
#[cfg(test)]
pub(crate) fn set_keep_alive_counter(&mut self, keep_alive_counter: u32) {
self.keep_alive_counter = keep_alive_counter;
}
#[cfg(test)]
pub(crate) fn state(&self) -> SubscriptionState {
self.state
}
#[cfg(test)]
pub(crate) fn set_state(&mut self, state: SubscriptionState) {
self.state = state;
}
pub fn message_sent(&self) -> bool {
self.first_message_sent
}
#[cfg(test)]
pub(crate) fn set_message_sent(&mut self, message_sent: bool) {
self.first_message_sent = message_sent;
}
pub fn publishing_interval(&self) -> Duration {
self.publishing_interval
}
pub(crate) fn set_publishing_interval(&mut self, publishing_interval: Duration) {
self.publishing_interval = publishing_interval;
self.reset_lifetime_counter();
}
pub fn max_keep_alive_count(&self) -> u32 {
self.max_keep_alive_counter
}
pub(crate) fn set_max_keep_alive_count(&mut self, max_keep_alive_count: u32) {
self.max_keep_alive_counter = max_keep_alive_count;
}
pub fn max_lifetime_count(&self) -> u32 {
self.max_lifetime_counter
}
pub(crate) fn set_max_lifetime_count(&mut self, max_lifetime_count: u32) {
self.max_lifetime_counter = max_lifetime_count;
}
pub fn priority(&self) -> u8 {
self.priority
}
pub(crate) fn set_priority(&mut self, priority: u8) {
self.priority = priority;
}
pub(crate) fn set_publishing_enabled(&mut self, publishing_enabled: bool) {
self.publishing_enabled = publishing_enabled;
self.reset_lifetime_counter();
}
pub(crate) fn set_diagnostics_on_drop(&mut self, diagnostics_on_drop: bool) {
self.diagnostics_on_drop = diagnostics_on_drop;
}
fn validate_triggered_items(&self, monitored_item_id: u32, items: &[u32]) -> (Vec<StatusCode>, Vec<u32>) {
let is_good_monitored_item = |i| { self.monitored_items.contains_key(i) && *i != monitored_item_id };
let is_good_monitored_item_result = |i| { if is_good_monitored_item(i) { StatusCode::Good } else { StatusCode::BadMonitoredItemIdInvalid } };
let results: Vec<StatusCode> = items.iter().map(is_good_monitored_item_result).collect();
let items: Vec<u32> = items.iter().filter(|i| is_good_monitored_item(i)).map(|i| *i).collect();
(results, items)
}
pub(crate) fn set_triggering(&mut self, monitored_item_id: u32, items_to_add: &[u32], items_to_remove: &[u32]) -> Result<(Vec<StatusCode>, Vec<StatusCode>), StatusCode> {
let (add_results, items_to_add) = self.validate_triggered_items(monitored_item_id, items_to_add);
let (remove_results, items_to_remove) = self.validate_triggered_items(monitored_item_id, items_to_remove);
if let Some(ref mut monitored_item) = self.monitored_items.get_mut(&monitored_item_id) {
monitored_item.set_triggering(items_to_add.as_slice(), items_to_remove.as_slice());
Ok((add_results, remove_results))
} else {
Err(StatusCode::BadMonitoredItemIdInvalid)
}
}
}