1use std::collections::{BTreeSet, HashMap, VecDeque};
6use std::sync::Arc;
7
8use crate::sync::*;
9use crate::types::{
10 service_types::{
11 MonitoredItemCreateRequest, MonitoredItemCreateResult, MonitoredItemModifyRequest,
12 MonitoredItemModifyResult, NotificationMessage, TimestampsToReturn,
13 },
14 status_code::StatusCode,
15 *,
16};
17
18use crate::core::handle::Handle;
19
20use crate::server::{
21 address_space::AddressSpace,
22 constants,
23 diagnostics::ServerDiagnostics,
24 state::ServerState,
25 subscriptions::monitored_item::{MonitoredItem, Notification, TickResult},
26};
27
28#[derive(Debug, Copy, Clone, PartialEq, Serialize)]
30pub(crate) enum SubscriptionState {
31 Closed,
32 Creating,
33 Normal,
34 Late,
35 KeepAlive,
36}
37
38#[derive(Debug)]
39pub(crate) struct SubscriptionStateParams {
40 pub notifications_available: bool,
41 pub more_notifications: bool,
42 pub publishing_req_queued: bool,
43 pub publishing_timer_expired: bool,
44}
45
46#[derive(Debug, Copy, Clone, PartialEq)]
47pub enum UpdateStateAction {
48 None,
49 ReturnKeepAlive,
51 ReturnNotifications,
53 SubscriptionCreated,
55 SubscriptionExpired,
57}
58
59#[derive(Debug, Copy, Clone, PartialEq)]
64pub(crate) enum HandledState {
65 None0 = 0,
66 Create3 = 3,
67 Normal4 = 4,
68 Normal5 = 5,
69 IntervalElapsed6 = 6,
70 IntervalElapsed7 = 7,
71 IntervalElapsed8 = 8,
72 IntervalElapsed9 = 9,
73 Late10 = 10,
74 Late11 = 11,
75 Late12 = 12,
76 KeepAlive13 = 13,
77 KeepAlive14 = 14,
78 KeepAlive15 = 15,
79 KeepAlive16 = 16,
80 KeepAlive17 = 17,
81 Closed27 = 27,
82}
83
84#[derive(Debug)]
86pub(crate) struct UpdateStateResult {
87 pub handled_state: HandledState,
88 pub update_state_action: UpdateStateAction,
89}
90
91impl UpdateStateResult {
92 pub fn new(
93 handled_state: HandledState,
94 update_state_action: UpdateStateAction,
95 ) -> UpdateStateResult {
96 UpdateStateResult {
97 handled_state,
98 update_state_action,
99 }
100 }
101}
102
103#[derive(Debug, Copy, Clone, PartialEq)]
104pub(crate) enum TickReason {
105 ReceivePublishRequest,
106 TickTimerFired,
107}
108
109#[derive(Debug, Clone, Serialize)]
110pub struct Subscription {
111 subscription_id: u32,
113 publishing_interval: Duration,
115 max_lifetime_counter: u32,
117 max_keep_alive_counter: u32,
119 priority: u8,
123 monitored_items: HashMap<u32, MonitoredItem>,
125 state: SubscriptionState,
127 lifetime_counter: u32,
130 keep_alive_counter: u32,
133 first_message_sent: bool,
138 publishing_enabled: bool,
140 resend_data: bool,
143 sequence_number: Handle,
145 last_sequence_number: u32,
149 next_monitored_item_id: u32,
151 last_time_publishing_interval_elapsed: DateTimeUtc,
153 #[serde(skip)]
155 notifications: VecDeque<NotificationMessage>,
156 #[serde(skip)]
158 diagnostics: Arc<RwLock<ServerDiagnostics>>,
159 #[serde(skip)]
161 diagnostics_on_drop: bool,
162}
163
164impl Drop for Subscription {
165 fn drop(&mut self) {
166 if self.diagnostics_on_drop {
167 let mut diagnostics = trace_write_lock!(self.diagnostics);
168 diagnostics.on_destroy_subscription(self);
169 }
170 }
171}
172
173impl Subscription {
174 pub fn new(
175 diagnostics: Arc<RwLock<ServerDiagnostics>>,
176 subscription_id: u32,
177 publishing_enabled: bool,
178 publishing_interval: Duration,
179 lifetime_counter: u32,
180 keep_alive_counter: u32,
181 priority: u8,
182 ) -> Subscription {
183 let subscription = Subscription {
184 subscription_id,
185 publishing_interval,
186 priority,
187 monitored_items: HashMap::with_capacity(constants::DEFAULT_MONITORED_ITEM_CAPACITY),
188 max_lifetime_counter: lifetime_counter,
189 max_keep_alive_counter: keep_alive_counter,
190 state: SubscriptionState::Creating,
192 lifetime_counter,
193 keep_alive_counter,
194 first_message_sent: false,
195 publishing_enabled,
196 resend_data: false,
197 sequence_number: Handle::new(1),
199 last_sequence_number: 0,
200 next_monitored_item_id: 1,
201 last_time_publishing_interval_elapsed: chrono::Utc::now(),
202 notifications: VecDeque::with_capacity(100),
203 diagnostics,
204 diagnostics_on_drop: true,
205 };
206 {
207 let mut diagnostics = trace_write_lock!(subscription.diagnostics);
208 diagnostics.on_create_subscription(&subscription);
209 }
210 subscription
211 }
212
213 pub(crate) fn ready_to_remove(&self) -> bool {
214 self.state == SubscriptionState::Closed && self.notifications.is_empty()
215 }
216
217 fn monitored_item_create_error(status_code: StatusCode) -> MonitoredItemCreateResult {
219 MonitoredItemCreateResult {
220 status_code,
221 monitored_item_id: 0,
222 revised_sampling_interval: 0f64,
223 revised_queue_size: 0,
224 filter_result: ExtensionObject::null(),
225 }
226 }
227
228 pub fn monitored_items_len(&self) -> usize {
229 self.monitored_items.len()
230 }
231
232 pub fn create_monitored_items(
234 &mut self,
235 server_state: &ServerState,
236 address_space: &AddressSpace,
237 now: &DateTimeUtc,
238 timestamps_to_return: TimestampsToReturn,
239 items_to_create: &[MonitoredItemCreateRequest],
240 ) -> Vec<MonitoredItemCreateResult> {
241 self.reset_lifetime_counter();
242
243 items_to_create
245 .iter()
246 .map(|item_to_create| {
247 if !address_space.node_exists(&item_to_create.item_to_monitor.node_id) {
248 Self::monitored_item_create_error(StatusCode::BadNodeIdUnknown)
249 } else {
250 let monitored_item_id = self.next_monitored_item_id;
255 match MonitoredItem::new(
256 now,
257 monitored_item_id,
258 timestamps_to_return,
259 server_state,
260 item_to_create,
261 ) {
262 Ok(monitored_item) => {
263 if server_state.max_monitored_items_per_sub == 0
264 || self.monitored_items.len()
265 <= server_state.max_monitored_items_per_sub
266 {
267 let revised_sampling_interval = monitored_item.sampling_interval();
268 let revised_queue_size = monitored_item.queue_size() as u32;
269 match monitored_item.validate_filter(address_space) {
271 Ok(filter_result) => {
272 self.monitored_items
274 .insert(monitored_item_id, monitored_item);
275 self.next_monitored_item_id += 1;
276 MonitoredItemCreateResult {
277 status_code: StatusCode::Good,
278 monitored_item_id,
279 revised_sampling_interval,
280 revised_queue_size,
281 filter_result,
282 }
283 }
284 Err(status_code) => {
285 Self::monitored_item_create_error(status_code)
286 }
287 }
288 } else {
289 Self::monitored_item_create_error(
291 StatusCode::BadTooManyMonitoredItems,
292 )
293 }
294 }
295 Err(status_code) => Self::monitored_item_create_error(status_code),
296 }
297 }
298 })
299 .collect()
300 }
301
302 pub fn modify_monitored_items(
304 &mut self,
305 server_state: &ServerState,
306 address_space: &AddressSpace,
307 timestamps_to_return: TimestampsToReturn,
308 items_to_modify: &[MonitoredItemModifyRequest],
309 ) -> Vec<MonitoredItemModifyResult> {
310 self.reset_lifetime_counter();
311 items_to_modify
312 .iter()
313 .map(|item_to_modify| {
314 match self
315 .monitored_items
316 .get_mut(&item_to_modify.monitored_item_id)
317 {
318 Some(monitored_item) => {
319 let modify_result = monitored_item.modify(
321 server_state,
322 address_space,
323 timestamps_to_return,
324 item_to_modify,
325 );
326 match modify_result {
327 Ok(filter_result) => MonitoredItemModifyResult {
328 status_code: StatusCode::Good,
329 revised_sampling_interval: monitored_item.sampling_interval(),
330 revised_queue_size: monitored_item.queue_size() as u32,
331 filter_result,
332 },
333 Err(err) => MonitoredItemModifyResult {
334 status_code: err,
335 revised_sampling_interval: 0f64,
336 revised_queue_size: 0,
337 filter_result: ExtensionObject::null(),
338 },
339 }
340 }
341 None => MonitoredItemModifyResult {
343 status_code: StatusCode::BadMonitoredItemIdInvalid,
344 revised_sampling_interval: 0f64,
345 revised_queue_size: 0,
346 filter_result: ExtensionObject::null(),
347 },
348 }
349 })
350 .collect()
351 }
352
353 pub fn set_monitoring_mode(
355 &mut self,
356 monitored_item_id: u32,
357 monitoring_mode: MonitoringMode,
358 ) -> StatusCode {
359 if let Some(monitored_item) = self.monitored_items.get_mut(&monitored_item_id) {
360 monitored_item.set_monitoring_mode(monitoring_mode);
361 StatusCode::Good
362 } else {
363 StatusCode::BadMonitoredItemIdInvalid
364 }
365 }
366
367 pub fn delete_monitored_items(&mut self, items_to_delete: &[u32]) -> Vec<StatusCode> {
369 self.reset_lifetime_counter();
370 items_to_delete
371 .iter()
372 .map(
373 |item_to_delete| match self.monitored_items.remove(item_to_delete) {
374 Some(_) => StatusCode::Good,
375 None => StatusCode::BadMonitoredItemIdInvalid,
376 },
377 )
378 .collect()
379 }
380
381 pub fn get_handles(&self) -> (Vec<u32>, Vec<u32>) {
384 let server_handles = self
385 .monitored_items
386 .values()
387 .map(|i| i.monitored_item_id())
388 .collect();
389 let client_handles = self
390 .monitored_items
391 .values()
392 .map(|i| i.client_handle())
393 .collect();
394 (server_handles, client_handles)
395 }
396
397 pub fn set_resend_data(&mut self) {
400 self.resend_data = true;
401 }
402
403 fn test_and_set_publishing_interval_elapsed(&mut self, now: &DateTimeUtc) -> bool {
406 let publishing_interval = super::duration_from_ms(self.publishing_interval);
409 let elapsed = now
411 .signed_duration_since(self.last_time_publishing_interval_elapsed)
412 .to_std()
413 .unwrap();
414 if elapsed >= publishing_interval {
415 self.last_time_publishing_interval_elapsed = *now;
416 true
417 } else {
418 false
419 }
420 }
421
422 pub(crate) fn tick(
425 &mut self,
426 now: &DateTimeUtc,
427 address_space: &AddressSpace,
428 tick_reason: TickReason,
429 publishing_req_queued: bool,
430 ) {
431 let publishing_interval_elapsed = match tick_reason {
433 TickReason::ReceivePublishRequest => false,
434 TickReason::TickTimerFired => {
435 if self.state == SubscriptionState::Creating {
436 true
437 } else if self.publishing_interval <= 0f64 {
438 panic!("Publishing interval should have been revised to min interval")
439 } else {
440 self.test_and_set_publishing_interval_elapsed(now)
441 }
442 }
443 };
444
445 let notification = match self.state {
450 SubscriptionState::Closed | SubscriptionState::Creating => None,
451 _ => {
452 let resend_data = self.resend_data;
453 self.tick_monitored_items(
454 now,
455 address_space,
456 publishing_interval_elapsed,
457 resend_data,
458 )
459 }
460 };
461 self.resend_data = false;
462
463 let notifications_available = !self.notifications.is_empty() || notification.is_some();
464 let more_notifications = self.notifications.len() > 1;
465
466 if notifications_available || publishing_interval_elapsed || publishing_req_queued {
469 let update_state_result = self.update_state(
471 tick_reason,
472 SubscriptionStateParams {
473 publishing_req_queued,
474 notifications_available,
475 more_notifications,
476 publishing_timer_expired: publishing_interval_elapsed,
477 },
478 );
479 trace!(
480 "subscription tick - update_state_result = {:?}",
481 update_state_result
482 );
483 self.handle_state_result(now, update_state_result, notification);
484 }
485 }
486
487 fn enqueue_notification(&mut self, notification: NotificationMessage) {
488 use std::u32;
489 let expected_sequence_number = if self.last_sequence_number == u32::MAX {
491 1
492 } else {
493 self.last_sequence_number + 1
494 };
495 if notification.sequence_number != expected_sequence_number {
496 panic!(
497 "Notification's sequence number is not sequential, expecting {}, got {}",
498 expected_sequence_number, notification.sequence_number
499 );
500 }
501 self.last_sequence_number = notification.sequence_number;
503 self.notifications.push_back(notification);
504 }
505
506 fn handle_state_result(
507 &mut self,
508 now: &DateTimeUtc,
509 update_state_result: UpdateStateResult,
510 notification: Option<NotificationMessage>,
511 ) {
512 match update_state_result.update_state_action {
514 UpdateStateAction::None => {
515 if let Some(ref notification) = notification {
516 let notification_sequence_number = notification.sequence_number;
518 self.sequence_number.set_next(notification_sequence_number);
519 debug!("Notification message nr {} was being ignored for a do-nothing, update state was {:?}", notification_sequence_number, update_state_result);
520 }
521 }
523 UpdateStateAction::ReturnKeepAlive => {
524 if let Some(ref notification) = notification {
525 let notification_sequence_number = notification.sequence_number;
527 self.sequence_number.set_next(notification_sequence_number);
528 debug!("Notification message nr {} was being ignored for a keep alive, update state was {:?}", notification_sequence_number, update_state_result);
529 }
530 debug!("Sending keep alive response");
532 let notification = NotificationMessage::keep_alive(
533 self.sequence_number.next(),
534 DateTime::from(*now),
535 );
536 self.enqueue_notification(notification);
537 }
538 UpdateStateAction::ReturnNotifications => {
539 if let Some(notification) = notification {
541 self.enqueue_notification(notification);
542 }
543 }
544 UpdateStateAction::SubscriptionCreated => {
545 if notification.is_some() {
546 panic!("SubscriptionCreated got a notification");
547 }
548 }
552 UpdateStateAction::SubscriptionExpired => {
553 if notification.is_some() {
554 panic!("SubscriptionExpired got a notification");
555 }
556 debug!("Subscription status change to closed / timeout");
558 self.monitored_items.clear();
559 let notification = NotificationMessage::status_change(
560 self.sequence_number.next(),
561 DateTime::from(*now),
562 StatusCode::BadTimeout,
563 );
564 self.enqueue_notification(notification);
565 }
566 }
567 }
568
569 pub(crate) fn take_notification(&mut self) -> Option<NotificationMessage> {
570 self.notifications.pop_front()
571 }
572
573 pub(crate) fn update_state(
596 &mut self,
597 tick_reason: TickReason,
598 p: SubscriptionStateParams,
599 ) -> UpdateStateResult {
600 if tick_reason == TickReason::ReceivePublishRequest && p.publishing_timer_expired {
603 panic!("Should not be possible for timer to have expired and received publish request at same time")
604 }
605
606 {
608 use log::Level::Trace;
609 if log_enabled!(Trace) {
610 trace!(
611 r#"State inputs:
612 subscription_id: {} / state: {:?}
613 tick_reason: {:?} / state_params: {:?}
614 publishing_enabled: {}
615 keep_alive_counter / lifetime_counter: {} / {}
616 message_sent: {}"#,
617 self.subscription_id,
618 self.state,
619 tick_reason,
620 p,
621 self.publishing_enabled,
622 self.keep_alive_counter,
623 self.lifetime_counter,
624 self.first_message_sent
625 );
626 }
627 }
628
629 match self.state {
639 SubscriptionState::Normal | SubscriptionState::Late | SubscriptionState::KeepAlive => {
640 if self.lifetime_counter == 1 {
641 self.state = SubscriptionState::Closed;
643 return UpdateStateResult::new(
644 HandledState::Closed27,
645 UpdateStateAction::SubscriptionExpired,
646 );
647 }
648 }
649 _ => {
650 }
652 }
653
654 match self.state {
655 SubscriptionState::Creating => {
656 self.state = SubscriptionState::Normal;
661 self.first_message_sent = false;
662 return UpdateStateResult::new(
663 HandledState::Create3,
664 UpdateStateAction::SubscriptionCreated,
665 );
666 }
667 SubscriptionState::Normal => {
668 if tick_reason == TickReason::ReceivePublishRequest
669 && (!self.publishing_enabled
670 || (self.publishing_enabled && !p.more_notifications))
671 {
672 return UpdateStateResult::new(HandledState::Normal4, UpdateStateAction::None);
674 } else if tick_reason == TickReason::ReceivePublishRequest
675 && self.publishing_enabled
676 && p.more_notifications
677 {
678 self.reset_lifetime_counter();
680 self.first_message_sent = true;
681 return UpdateStateResult::new(
682 HandledState::Normal5,
683 UpdateStateAction::ReturnNotifications,
684 );
685 } else if p.publishing_timer_expired
686 && p.publishing_req_queued
687 && self.publishing_enabled
688 && p.notifications_available
689 {
690 self.reset_lifetime_counter();
692 self.start_publishing_timer();
693 self.first_message_sent = true;
694 return UpdateStateResult::new(
695 HandledState::IntervalElapsed6,
696 UpdateStateAction::ReturnNotifications,
697 );
698 } else if p.publishing_timer_expired
699 && p.publishing_req_queued
700 && !self.first_message_sent
701 && (!self.publishing_enabled
702 || (self.publishing_enabled && !p.notifications_available))
703 {
704 self.reset_lifetime_counter();
706 self.start_publishing_timer();
707 self.first_message_sent = true;
708 return UpdateStateResult::new(
709 HandledState::IntervalElapsed7,
710 UpdateStateAction::ReturnKeepAlive,
711 );
712 } else if p.publishing_timer_expired
713 && !p.publishing_req_queued
714 && (!self.first_message_sent
715 || (self.publishing_enabled && p.notifications_available))
716 {
717 self.start_publishing_timer();
719 self.state = SubscriptionState::Late;
720 return UpdateStateResult::new(
721 HandledState::IntervalElapsed8,
722 UpdateStateAction::None,
723 );
724 } else if p.publishing_timer_expired
725 && self.first_message_sent
726 && (!self.publishing_enabled
727 || (self.publishing_enabled && !p.notifications_available))
728 {
729 self.start_publishing_timer();
731 self.reset_keep_alive_counter();
732 self.state = SubscriptionState::KeepAlive;
733 return UpdateStateResult::new(
734 HandledState::IntervalElapsed9,
735 UpdateStateAction::None,
736 );
737 }
738 }
739 SubscriptionState::Late => {
740 if tick_reason == TickReason::ReceivePublishRequest
741 && self.publishing_enabled
742 && (p.notifications_available || p.more_notifications)
743 {
744 self.reset_lifetime_counter();
746 self.state = SubscriptionState::Normal;
747 self.first_message_sent = true;
748 return UpdateStateResult::new(
749 HandledState::Late10,
750 UpdateStateAction::ReturnNotifications,
751 );
752 } else if tick_reason == TickReason::ReceivePublishRequest
753 && (!self.publishing_enabled
754 || (self.publishing_enabled
755 && !p.notifications_available
756 && !p.more_notifications))
757 {
758 self.reset_lifetime_counter();
760 self.state = SubscriptionState::KeepAlive;
761 self.first_message_sent = true;
762 return UpdateStateResult::new(
763 HandledState::Late11,
764 UpdateStateAction::ReturnKeepAlive,
765 );
766 } else if p.publishing_timer_expired {
767 self.start_publishing_timer();
769 return UpdateStateResult::new(HandledState::Late12, UpdateStateAction::None);
770 }
771 }
772 SubscriptionState::KeepAlive => {
773 if tick_reason == TickReason::ReceivePublishRequest {
774 return UpdateStateResult::new(
776 HandledState::KeepAlive13,
777 UpdateStateAction::None,
778 );
779 } else if p.publishing_timer_expired
780 && self.publishing_enabled
781 && p.notifications_available
782 && p.publishing_req_queued
783 {
784 self.first_message_sent = true;
786 self.state = SubscriptionState::Normal;
787 return UpdateStateResult::new(
788 HandledState::KeepAlive14,
789 UpdateStateAction::ReturnNotifications,
790 );
791 } else if p.publishing_timer_expired
792 && p.publishing_req_queued
793 && self.keep_alive_counter == 1
794 && (!self.publishing_enabled
795 || (self.publishing_enabled && p.notifications_available))
796 {
797 self.start_publishing_timer();
799 self.reset_keep_alive_counter();
800 return UpdateStateResult::new(
801 HandledState::KeepAlive15,
802 UpdateStateAction::ReturnKeepAlive,
803 );
804 } else if p.publishing_timer_expired
805 && self.keep_alive_counter > 1
806 && (!self.publishing_enabled
807 || (self.publishing_enabled && !p.notifications_available))
808 {
809 self.start_publishing_timer();
811 self.keep_alive_counter -= 1;
812 return UpdateStateResult::new(
813 HandledState::KeepAlive16,
814 UpdateStateAction::None,
815 );
816 } else if p.publishing_timer_expired
817 && !p.publishing_req_queued
818 && (self.keep_alive_counter == 1
819 || (self.keep_alive_counter > 1
820 && self.publishing_enabled
821 && p.notifications_available))
822 {
823 self.start_publishing_timer();
825 self.state = SubscriptionState::Late;
826 return UpdateStateResult::new(
827 HandledState::KeepAlive17,
828 UpdateStateAction::None,
829 );
830 }
831 }
832 _ => {
833 }
835 }
836
837 UpdateStateResult::new(HandledState::None0, UpdateStateAction::None)
838 }
839
840 fn tick_monitored_items(
848 &mut self,
849 now: &DateTimeUtc,
850 address_space: &AddressSpace,
851 publishing_interval_elapsed: bool,
852 resend_data: bool,
853 ) -> Option<NotificationMessage> {
854 let mut triggered_items: BTreeSet<u32> = BTreeSet::new();
855 let mut monitored_item_notifications = Vec::with_capacity(self.monitored_items.len() * 2);
856
857 for monitored_item in self.monitored_items.values_mut() {
858 let monitoring_mode = monitored_item.monitoring_mode();
860 match monitored_item.tick(now, address_space, publishing_interval_elapsed, resend_data)
861 {
862 TickResult::ReportValueChanged => {
863 if publishing_interval_elapsed {
864 match monitoring_mode {
866 MonitoringMode::Reporting => {
867 monitored_item.triggered_items().iter().for_each(|i| {
871 triggered_items.insert(*i);
872 })
873 }
874 _ => {
875 panic!("How can there be changes to report when monitored item is in this monitoring mode {:?}", monitoring_mode);
877 }
878 }
879 if let Some(mut item_notification_messages) =
881 monitored_item.all_notifications()
882 {
883 monitored_item_notifications.append(&mut item_notification_messages);
884 }
885 }
886 }
887 TickResult::ValueChanged => {
888 if publishing_interval_elapsed {
891 match monitoring_mode {
892 MonitoringMode::Sampling => {
893 monitored_item.triggered_items().iter().for_each(|i| {
896 triggered_items.insert(*i);
897 })
898 }
899 _ => {
900 panic!("How can there be a value change when the mode is not sampling?");
902 }
903 }
904 }
905 }
906 TickResult::NoChange => {
907 }
909 }
910 }
911
912 triggered_items.iter().for_each(|i| {
914 if let Some(ref mut monitored_item) = self.monitored_items.get_mut(i) {
915 match monitored_item.monitoring_mode() {
917 MonitoringMode::Sampling => {
918 monitored_item.check_value(address_space, now, true);
923 if let Some(mut notifications) = monitored_item.all_notifications() {
924 monitored_item_notifications.append(&mut notifications);
925 }
926 }
927 MonitoringMode::Reporting => {
928 }
934 MonitoringMode::Disabled => {
935 }
937 }
938 } else {
939 }
942 });
943
944 if !monitored_item_notifications.is_empty() {
946 let next_sequence_number = self.sequence_number.next();
947
948 trace!(
949 "Create notification for subscription {}, sequence number {}",
950 self.subscription_id,
951 next_sequence_number
952 );
953
954 let data_change_notifications = monitored_item_notifications
956 .iter()
957 .filter(|v| matches!(v, Notification::MonitoredItemNotification(_)))
958 .map(|v| {
959 if let Notification::MonitoredItemNotification(v) = v {
960 v.clone()
961 } else {
962 panic!()
963 }
964 })
965 .collect();
966
967 let event_notifications = monitored_item_notifications
969 .iter()
970 .filter(|v| matches!(v, Notification::Event(_)))
971 .map(|v| {
972 if let Notification::Event(v) = v {
973 v.clone()
974 } else {
975 panic!()
976 }
977 })
978 .collect();
979
980 let notification = NotificationMessage::data_change(
982 next_sequence_number,
983 DateTime::from(*now),
984 data_change_notifications,
985 event_notifications,
986 );
987 Some(notification)
988 } else {
989 None
990 }
991 }
992
993 pub fn reset_keep_alive_counter(&mut self) {
997 self.keep_alive_counter = self.max_keep_alive_counter;
998 }
999
1000 pub fn reset_lifetime_counter(&mut self) {
1003 self.lifetime_counter = self.max_lifetime_counter;
1004 }
1005
1006 pub fn start_publishing_timer(&mut self) {
1008 self.lifetime_counter -= 1;
1009 trace!("Decrementing life time counter {}", self.lifetime_counter);
1010 }
1011
1012 pub fn subscription_id(&self) -> u32 {
1013 self.subscription_id
1014 }
1015
1016 pub fn lifetime_counter(&self) -> u32 {
1017 self.lifetime_counter
1018 }
1019
1020 #[cfg(test)]
1021 pub(crate) fn set_current_lifetime_count(&mut self, current_lifetime_count: u32) {
1022 self.lifetime_counter = current_lifetime_count;
1023 }
1024
1025 pub fn keep_alive_counter(&self) -> u32 {
1026 self.keep_alive_counter
1027 }
1028
1029 #[cfg(test)]
1030 pub(crate) fn set_keep_alive_counter(&mut self, keep_alive_counter: u32) {
1031 self.keep_alive_counter = keep_alive_counter;
1032 }
1033
1034 #[cfg(test)]
1035 pub(crate) fn state(&self) -> SubscriptionState {
1036 self.state
1037 }
1038
1039 #[cfg(test)]
1040 pub(crate) fn set_state(&mut self, state: SubscriptionState) {
1041 self.state = state;
1042 }
1043
1044 pub fn message_sent(&self) -> bool {
1045 self.first_message_sent
1046 }
1047
1048 #[cfg(test)]
1049 pub(crate) fn set_message_sent(&mut self, message_sent: bool) {
1050 self.first_message_sent = message_sent;
1051 }
1052
1053 pub fn publishing_interval(&self) -> Duration {
1054 self.publishing_interval
1055 }
1056
1057 pub(crate) fn set_publishing_interval(&mut self, publishing_interval: Duration) {
1058 self.publishing_interval = publishing_interval;
1059 self.reset_lifetime_counter();
1060 }
1061
1062 pub fn max_keep_alive_count(&self) -> u32 {
1063 self.max_keep_alive_counter
1064 }
1065
1066 pub(crate) fn set_max_keep_alive_count(&mut self, max_keep_alive_count: u32) {
1067 self.max_keep_alive_counter = max_keep_alive_count;
1068 }
1069
1070 pub fn max_lifetime_count(&self) -> u32 {
1071 self.max_lifetime_counter
1072 }
1073
1074 pub(crate) fn set_max_lifetime_count(&mut self, max_lifetime_count: u32) {
1075 self.max_lifetime_counter = max_lifetime_count;
1076 }
1077
1078 pub fn priority(&self) -> u8 {
1079 self.priority
1080 }
1081
1082 pub(crate) fn set_priority(&mut self, priority: u8) {
1083 self.priority = priority;
1084 }
1085
1086 pub(crate) fn set_publishing_enabled(&mut self, publishing_enabled: bool) {
1087 self.publishing_enabled = publishing_enabled;
1088 self.reset_lifetime_counter();
1089 }
1090
1091 pub(crate) fn set_diagnostics_on_drop(&mut self, diagnostics_on_drop: bool) {
1092 self.diagnostics_on_drop = diagnostics_on_drop;
1093 }
1094
1095 fn validate_triggered_items(
1096 &self,
1097 monitored_item_id: u32,
1098 items: &[u32],
1099 ) -> (Vec<StatusCode>, Vec<u32>) {
1100 let is_good_monitored_item =
1102 |i| self.monitored_items.contains_key(i) && *i != monitored_item_id;
1103 let is_good_monitored_item_result = |i| {
1104 if is_good_monitored_item(i) {
1105 StatusCode::Good
1106 } else {
1107 StatusCode::BadMonitoredItemIdInvalid
1108 }
1109 };
1110
1111 let results: Vec<StatusCode> = items.iter().map(is_good_monitored_item_result).collect();
1113 let items: Vec<u32> = items
1114 .iter()
1115 .filter(|i| is_good_monitored_item(i))
1116 .copied()
1117 .collect();
1118
1119 (results, items)
1120 }
1121
1122 pub(crate) fn set_triggering(
1126 &mut self,
1127 monitored_item_id: u32,
1128 items_to_add: &[u32],
1129 items_to_remove: &[u32],
1130 ) -> Result<(Vec<StatusCode>, Vec<StatusCode>), StatusCode> {
1131 let (add_results, items_to_add) =
1133 self.validate_triggered_items(monitored_item_id, items_to_add);
1134 let (remove_results, items_to_remove) =
1135 self.validate_triggered_items(monitored_item_id, items_to_remove);
1136
1137 if let Some(ref mut monitored_item) = self.monitored_items.get_mut(&monitored_item_id) {
1138 monitored_item.set_triggering(items_to_add.as_slice(), items_to_remove.as_slice());
1140
1141 Ok((add_results, remove_results))
1142 } else {
1143 Err(StatusCode::BadMonitoredItemIdInvalid)
1145 }
1146 }
1147}