1use std::collections::{BTreeSet, HashMap, VecDeque};
6use std::sync::{Arc, RwLock};
7
8use opcua_types::{
9 service_types::{
10 MonitoredItemCreateRequest, MonitoredItemCreateResult, MonitoredItemModifyRequest,
11 MonitoredItemModifyResult, NotificationMessage, TimestampsToReturn,
12 },
13 status_code::StatusCode,
14 *,
15};
16
17use opcua_core::handle::Handle;
18
19use crate::{
20 address_space::AddressSpace,
21 constants,
22 diagnostics::ServerDiagnostics,
23 state::ServerState,
24 subscriptions::monitored_item::{MonitoredItem, Notification, TickResult},
25};
26
27#[derive(Debug, Copy, Clone, PartialEq, Serialize)]
29pub(crate) enum SubscriptionState {
30 Closed,
31 Creating,
32 Normal,
33 Late,
34 KeepAlive,
35}
36
37#[derive(Debug)]
38pub(crate) struct SubscriptionStateParams {
39 pub notifications_available: bool,
40 pub more_notifications: bool,
41 pub publishing_req_queued: bool,
42 pub publishing_timer_expired: bool,
43}
44
45#[derive(Debug, Copy, Clone, PartialEq)]
46pub enum UpdateStateAction {
47 None,
48 ReturnKeepAlive,
50 ReturnNotifications,
52 SubscriptionCreated,
54 SubscriptionExpired,
56}
57
58#[derive(Debug, Copy, Clone, PartialEq)]
63pub(crate) enum HandledState {
64 None0 = 0,
65 Create3 = 3,
66 Normal4 = 4,
67 Normal5 = 5,
68 IntervalElapsed6 = 6,
69 IntervalElapsed7 = 7,
70 IntervalElapsed8 = 8,
71 IntervalElapsed9 = 9,
72 Late10 = 10,
73 Late11 = 11,
74 Late12 = 12,
75 KeepAlive13 = 13,
76 KeepAlive14 = 14,
77 KeepAlive15 = 15,
78 KeepAlive16 = 16,
79 KeepAlive17 = 17,
80 Closed27 = 27,
81}
82
83#[derive(Debug)]
85pub(crate) struct UpdateStateResult {
86 pub handled_state: HandledState,
87 pub update_state_action: UpdateStateAction,
88}
89
90impl UpdateStateResult {
91 pub fn new(
92 handled_state: HandledState,
93 update_state_action: UpdateStateAction,
94 ) -> UpdateStateResult {
95 UpdateStateResult {
96 handled_state,
97 update_state_action,
98 }
99 }
100}
101
102#[derive(Debug, Copy, Clone, PartialEq)]
103pub(crate) enum TickReason {
104 ReceivePublishRequest,
105 TickTimerFired,
106}
107
108#[derive(Debug, Clone, Serialize)]
109pub struct Subscription {
110 subscription_id: u32,
112 publishing_interval: Duration,
114 max_lifetime_counter: u32,
116 max_keep_alive_counter: u32,
118 priority: u8,
122 monitored_items: HashMap<u32, MonitoredItem>,
124 state: SubscriptionState,
126 lifetime_counter: u32,
129 keep_alive_counter: u32,
132 first_message_sent: bool,
137 publishing_enabled: bool,
139 resend_data: bool,
142 sequence_number: Handle,
144 last_sequence_number: u32,
148 next_monitored_item_id: u32,
150 last_time_publishing_interval_elapsed: DateTimeUtc,
152 #[serde(skip)]
154 notifications: VecDeque<NotificationMessage>,
155 #[serde(skip)]
157 diagnostics: Arc<RwLock<ServerDiagnostics>>,
158 #[serde(skip)]
160 diagnostics_on_drop: bool,
161}
162
163impl Drop for Subscription {
164 fn drop(&mut self) {
165 if self.diagnostics_on_drop {
166 let mut diagnostics = trace_write_lock!(self.diagnostics);
167 diagnostics.on_destroy_subscription(self);
168 }
169 }
170}
171
172impl Subscription {
173 pub fn new(
174 diagnostics: Arc<RwLock<ServerDiagnostics>>,
175 subscription_id: u32,
176 publishing_enabled: bool,
177 publishing_interval: Duration,
178 lifetime_counter: u32,
179 keep_alive_counter: u32,
180 priority: u8,
181 ) -> Subscription {
182 let subscription = Subscription {
183 subscription_id,
184 publishing_interval,
185 priority,
186 monitored_items: HashMap::with_capacity(constants::DEFAULT_MONITORED_ITEM_CAPACITY),
187 max_lifetime_counter: lifetime_counter,
188 max_keep_alive_counter: keep_alive_counter,
189 state: SubscriptionState::Creating,
191 lifetime_counter,
192 keep_alive_counter,
193 first_message_sent: false,
194 publishing_enabled,
195 resend_data: false,
196 sequence_number: Handle::new(1),
198 last_sequence_number: 0,
199 next_monitored_item_id: 1,
200 last_time_publishing_interval_elapsed: chrono::Utc::now(),
201 notifications: VecDeque::with_capacity(100),
202 diagnostics,
203 diagnostics_on_drop: true,
204 };
205 {
206 let mut diagnostics = trace_write_lock!(subscription.diagnostics);
207 diagnostics.on_create_subscription(&subscription);
208 }
209 subscription
210 }
211
212 pub(crate) fn ready_to_remove(&self) -> bool {
213 self.state == SubscriptionState::Closed && self.notifications.is_empty()
214 }
215
216 fn monitored_item_create_error(status_code: StatusCode) -> MonitoredItemCreateResult {
218 MonitoredItemCreateResult {
219 status_code,
220 monitored_item_id: 0,
221 revised_sampling_interval: 0f64,
222 revised_queue_size: 0,
223 filter_result: ExtensionObject::null(),
224 }
225 }
226
227 pub fn monitored_items_len(&self) -> usize {
228 self.monitored_items.len()
229 }
230
231 pub fn create_monitored_items(
233 &mut self,
234 server_state: &ServerState,
235 address_space: &AddressSpace,
236 now: &DateTimeUtc,
237 timestamps_to_return: TimestampsToReturn,
238 items_to_create: &[MonitoredItemCreateRequest],
239 ) -> Vec<MonitoredItemCreateResult> {
240 self.reset_lifetime_counter();
241
242 items_to_create
244 .iter()
245 .map(|item_to_create| {
246 if !address_space.node_exists(&item_to_create.item_to_monitor.node_id) {
247 Self::monitored_item_create_error(StatusCode::BadNodeIdUnknown)
248 } else {
249 let monitored_item_id = self.next_monitored_item_id;
254 match MonitoredItem::new(
255 now,
256 monitored_item_id,
257 timestamps_to_return,
258 server_state,
259 item_to_create,
260 ) {
261 Ok(monitored_item) => {
262 if server_state.max_monitored_items_per_sub == 0
263 || self.monitored_items.len()
264 <= server_state.max_monitored_items_per_sub
265 {
266 let revised_sampling_interval = monitored_item.sampling_interval();
267 let revised_queue_size = monitored_item.queue_size() as u32;
268 match monitored_item.validate_filter(address_space) {
270 Ok(filter_result) => {
271 self.monitored_items
273 .insert(monitored_item_id, monitored_item);
274 self.next_monitored_item_id += 1;
275 MonitoredItemCreateResult {
276 status_code: StatusCode::Good,
277 monitored_item_id,
278 revised_sampling_interval,
279 revised_queue_size,
280 filter_result,
281 }
282 }
283 Err(status_code) => {
284 Self::monitored_item_create_error(status_code)
285 }
286 }
287 } else {
288 Self::monitored_item_create_error(
290 StatusCode::BadTooManyMonitoredItems,
291 )
292 }
293 }
294 Err(status_code) => Self::monitored_item_create_error(status_code),
295 }
296 }
297 })
298 .collect()
299 }
300
301 pub fn modify_monitored_items(
303 &mut self,
304 server_state: &ServerState,
305 address_space: &AddressSpace,
306 timestamps_to_return: TimestampsToReturn,
307 items_to_modify: &[MonitoredItemModifyRequest],
308 ) -> Vec<MonitoredItemModifyResult> {
309 self.reset_lifetime_counter();
310 items_to_modify
311 .iter()
312 .map(|item_to_modify| {
313 match self
314 .monitored_items
315 .get_mut(&item_to_modify.monitored_item_id)
316 {
317 Some(monitored_item) => {
318 let modify_result = monitored_item.modify(
320 server_state,
321 address_space,
322 timestamps_to_return,
323 item_to_modify,
324 );
325 match modify_result {
326 Ok(filter_result) => MonitoredItemModifyResult {
327 status_code: StatusCode::Good,
328 revised_sampling_interval: monitored_item.sampling_interval(),
329 revised_queue_size: monitored_item.queue_size() as u32,
330 filter_result,
331 },
332 Err(err) => MonitoredItemModifyResult {
333 status_code: err,
334 revised_sampling_interval: 0f64,
335 revised_queue_size: 0,
336 filter_result: ExtensionObject::null(),
337 },
338 }
339 }
340 None => MonitoredItemModifyResult {
342 status_code: StatusCode::BadMonitoredItemIdInvalid,
343 revised_sampling_interval: 0f64,
344 revised_queue_size: 0,
345 filter_result: ExtensionObject::null(),
346 },
347 }
348 })
349 .collect()
350 }
351
352 pub fn set_monitoring_mode(
354 &mut self,
355 monitored_item_id: u32,
356 monitoring_mode: MonitoringMode,
357 ) -> StatusCode {
358 if let Some(monitored_item) = self.monitored_items.get_mut(&monitored_item_id) {
359 monitored_item.set_monitoring_mode(monitoring_mode);
360 StatusCode::Good
361 } else {
362 StatusCode::BadMonitoredItemIdInvalid
363 }
364 }
365
366 pub fn delete_monitored_items(&mut self, items_to_delete: &[u32]) -> Vec<StatusCode> {
368 self.reset_lifetime_counter();
369 items_to_delete
370 .iter()
371 .map(
372 |item_to_delete| match self.monitored_items.remove(item_to_delete) {
373 Some(_) => StatusCode::Good,
374 None => StatusCode::BadMonitoredItemIdInvalid,
375 },
376 )
377 .collect()
378 }
379
380 pub fn get_handles(&self) -> (Vec<u32>, Vec<u32>) {
383 let server_handles = self
384 .monitored_items
385 .values()
386 .map(|i| i.monitored_item_id())
387 .collect();
388 let client_handles = self
389 .monitored_items
390 .values()
391 .map(|i| i.client_handle())
392 .collect();
393 (server_handles, client_handles)
394 }
395
396 pub fn set_resend_data(&mut self) {
399 self.resend_data = true;
400 }
401
402 fn test_and_set_publishing_interval_elapsed(&mut self, now: &DateTimeUtc) -> bool {
405 let publishing_interval = super::duration_from_ms(self.publishing_interval);
408 let elapsed = now.signed_duration_since(self.last_time_publishing_interval_elapsed);
409 if elapsed >= publishing_interval {
410 self.last_time_publishing_interval_elapsed = *now;
411 true
412 } else {
413 false
414 }
415 }
416
417 pub(crate) fn tick(
420 &mut self,
421 now: &DateTimeUtc,
422 address_space: &AddressSpace,
423 tick_reason: TickReason,
424 publishing_req_queued: bool,
425 ) {
426 let publishing_interval_elapsed = match tick_reason {
428 TickReason::ReceivePublishRequest => false,
429 TickReason::TickTimerFired => {
430 if self.state == SubscriptionState::Creating {
431 true
432 } else if self.publishing_interval <= 0f64 {
433 panic!("Publishing interval should have been revised to min interval")
434 } else {
435 self.test_and_set_publishing_interval_elapsed(now)
436 }
437 }
438 };
439
440 let notification = match self.state {
445 SubscriptionState::Closed | SubscriptionState::Creating => None,
446 _ => {
447 let resend_data = self.resend_data;
448 self.tick_monitored_items(
449 now,
450 address_space,
451 publishing_interval_elapsed,
452 resend_data,
453 )
454 }
455 };
456 self.resend_data = false;
457
458 let notifications_available = !self.notifications.is_empty() || notification.is_some();
459 let more_notifications = self.notifications.len() > 1;
460
461 if notifications_available || publishing_interval_elapsed || publishing_req_queued {
464 let update_state_result = self.update_state(
466 tick_reason,
467 SubscriptionStateParams {
468 publishing_req_queued,
469 notifications_available,
470 more_notifications,
471 publishing_timer_expired: publishing_interval_elapsed,
472 },
473 );
474 trace!(
475 "subscription tick - update_state_result = {:?}",
476 update_state_result
477 );
478 self.handle_state_result(now, update_state_result, notification);
479 }
480 }
481
482 fn enqueue_notification(&mut self, notification: NotificationMessage) {
483 use std::u32;
484 let expected_sequence_number = if self.last_sequence_number == u32::MAX {
486 1
487 } else {
488 self.last_sequence_number + 1
489 };
490 if notification.sequence_number != expected_sequence_number {
491 panic!(
492 "Notification's sequence number is not sequential, expecting {}, got {}",
493 expected_sequence_number, notification.sequence_number
494 );
495 }
496 self.last_sequence_number = notification.sequence_number;
498 self.notifications.push_back(notification);
499 }
500
501 fn handle_state_result(
502 &mut self,
503 now: &DateTimeUtc,
504 update_state_result: UpdateStateResult,
505 notification: Option<NotificationMessage>,
506 ) {
507 match update_state_result.update_state_action {
509 UpdateStateAction::None => {
510 if let Some(ref notification) = notification {
511 let notification_sequence_number = notification.sequence_number;
513 self.sequence_number.set_next(notification_sequence_number);
514 debug!("Notification message nr {} was being ignored for a do-nothing, update state was {:?}", notification_sequence_number, update_state_result);
515 }
516 }
518 UpdateStateAction::ReturnKeepAlive => {
519 if let Some(ref notification) = notification {
520 let notification_sequence_number = notification.sequence_number;
522 self.sequence_number.set_next(notification_sequence_number);
523 debug!("Notification message nr {} was being ignored for a keep alive, update state was {:?}", notification_sequence_number, update_state_result);
524 }
525 debug!("Sending keep alive response");
527 let notification = NotificationMessage::keep_alive(
528 self.sequence_number.next(),
529 DateTime::from(*now),
530 );
531 self.enqueue_notification(notification);
532 }
533 UpdateStateAction::ReturnNotifications => {
534 if let Some(notification) = notification {
536 self.enqueue_notification(notification);
537 }
538 }
539 UpdateStateAction::SubscriptionCreated => {
540 if notification.is_some() {
541 panic!("SubscriptionCreated got a notification");
542 }
543 }
547 UpdateStateAction::SubscriptionExpired => {
548 if notification.is_some() {
549 panic!("SubscriptionExpired got a notification");
550 }
551 debug!("Subscription status change to closed / timeout");
553 self.monitored_items.clear();
554 let notification = NotificationMessage::status_change(
555 self.sequence_number.next(),
556 DateTime::from(*now),
557 StatusCode::BadTimeout,
558 );
559 self.enqueue_notification(notification);
560 }
561 }
562 }
563
564 pub(crate) fn take_notification(&mut self) -> Option<NotificationMessage> {
565 self.notifications.pop_front()
566 }
567
568 pub(crate) fn update_state(
591 &mut self,
592 tick_reason: TickReason,
593 p: SubscriptionStateParams,
594 ) -> UpdateStateResult {
595 if tick_reason == TickReason::ReceivePublishRequest && p.publishing_timer_expired {
598 panic!("Should not be possible for timer to have expired and received publish request at same time")
599 }
600
601 {
603 use log::Level::Trace;
604 if log_enabled!(Trace) {
605 trace!(
606 r#"State inputs:
607 subscription_id: {} / state: {:?}
608 tick_reason: {:?} / state_params: {:?}
609 publishing_enabled: {}
610 keep_alive_counter / lifetime_counter: {} / {}
611 message_sent: {}"#,
612 self.subscription_id,
613 self.state,
614 tick_reason,
615 p,
616 self.publishing_enabled,
617 self.keep_alive_counter,
618 self.lifetime_counter,
619 self.first_message_sent
620 );
621 }
622 }
623
624 match self.state {
634 SubscriptionState::Normal | SubscriptionState::Late | SubscriptionState::KeepAlive => {
635 if self.lifetime_counter == 1 {
636 self.state = SubscriptionState::Closed;
638 return UpdateStateResult::new(
639 HandledState::Closed27,
640 UpdateStateAction::SubscriptionExpired,
641 );
642 }
643 }
644 _ => {
645 }
647 }
648
649 match self.state {
650 SubscriptionState::Creating => {
651 self.state = SubscriptionState::Normal;
656 self.first_message_sent = false;
657 return UpdateStateResult::new(
658 HandledState::Create3,
659 UpdateStateAction::SubscriptionCreated,
660 );
661 }
662 SubscriptionState::Normal => {
663 if tick_reason == TickReason::ReceivePublishRequest
664 && (!self.publishing_enabled
665 || (self.publishing_enabled && !p.more_notifications))
666 {
667 return UpdateStateResult::new(HandledState::Normal4, UpdateStateAction::None);
669 } else if tick_reason == TickReason::ReceivePublishRequest
670 && self.publishing_enabled
671 && p.more_notifications
672 {
673 self.reset_lifetime_counter();
675 self.first_message_sent = true;
676 return UpdateStateResult::new(
677 HandledState::Normal5,
678 UpdateStateAction::ReturnNotifications,
679 );
680 } else if p.publishing_timer_expired
681 && p.publishing_req_queued
682 && self.publishing_enabled
683 && p.notifications_available
684 {
685 self.reset_lifetime_counter();
687 self.start_publishing_timer();
688 self.first_message_sent = true;
689 return UpdateStateResult::new(
690 HandledState::IntervalElapsed6,
691 UpdateStateAction::ReturnNotifications,
692 );
693 } else if p.publishing_timer_expired
694 && p.publishing_req_queued
695 && !self.first_message_sent
696 && (!self.publishing_enabled
697 || (self.publishing_enabled && !p.notifications_available))
698 {
699 self.reset_lifetime_counter();
701 self.start_publishing_timer();
702 self.first_message_sent = true;
703 return UpdateStateResult::new(
704 HandledState::IntervalElapsed7,
705 UpdateStateAction::ReturnKeepAlive,
706 );
707 } else if p.publishing_timer_expired
708 && !p.publishing_req_queued
709 && (!self.first_message_sent
710 || (self.publishing_enabled && p.notifications_available))
711 {
712 self.start_publishing_timer();
714 self.state = SubscriptionState::Late;
715 return UpdateStateResult::new(
716 HandledState::IntervalElapsed8,
717 UpdateStateAction::None,
718 );
719 } else if p.publishing_timer_expired
720 && self.first_message_sent
721 && (!self.publishing_enabled
722 || (self.publishing_enabled && !p.notifications_available))
723 {
724 self.start_publishing_timer();
726 self.reset_keep_alive_counter();
727 self.state = SubscriptionState::KeepAlive;
728 return UpdateStateResult::new(
729 HandledState::IntervalElapsed9,
730 UpdateStateAction::None,
731 );
732 }
733 }
734 SubscriptionState::Late => {
735 if tick_reason == TickReason::ReceivePublishRequest
736 && self.publishing_enabled
737 && (p.notifications_available || p.more_notifications)
738 {
739 self.reset_lifetime_counter();
741 self.state = SubscriptionState::Normal;
742 self.first_message_sent = true;
743 return UpdateStateResult::new(
744 HandledState::Late10,
745 UpdateStateAction::ReturnNotifications,
746 );
747 } else if tick_reason == TickReason::ReceivePublishRequest
748 && (!self.publishing_enabled
749 || (self.publishing_enabled
750 && !p.notifications_available
751 && !p.more_notifications))
752 {
753 self.reset_lifetime_counter();
755 self.state = SubscriptionState::KeepAlive;
756 self.first_message_sent = true;
757 return UpdateStateResult::new(
758 HandledState::Late11,
759 UpdateStateAction::ReturnKeepAlive,
760 );
761 } else if p.publishing_timer_expired {
762 self.start_publishing_timer();
764 return UpdateStateResult::new(HandledState::Late12, UpdateStateAction::None);
765 }
766 }
767 SubscriptionState::KeepAlive => {
768 if tick_reason == TickReason::ReceivePublishRequest {
769 return UpdateStateResult::new(
771 HandledState::KeepAlive13,
772 UpdateStateAction::None,
773 );
774 } else if p.publishing_timer_expired
775 && self.publishing_enabled
776 && p.notifications_available
777 && p.publishing_req_queued
778 {
779 self.first_message_sent = true;
781 self.state = SubscriptionState::Normal;
782 return UpdateStateResult::new(
783 HandledState::KeepAlive14,
784 UpdateStateAction::ReturnNotifications,
785 );
786 } else if p.publishing_timer_expired
787 && p.publishing_req_queued
788 && self.keep_alive_counter == 1
789 && (!self.publishing_enabled
790 || (self.publishing_enabled && p.notifications_available))
791 {
792 self.start_publishing_timer();
794 self.reset_keep_alive_counter();
795 return UpdateStateResult::new(
796 HandledState::KeepAlive15,
797 UpdateStateAction::ReturnKeepAlive,
798 );
799 } else if p.publishing_timer_expired
800 && self.keep_alive_counter > 1
801 && (!self.publishing_enabled
802 || (self.publishing_enabled && !p.notifications_available))
803 {
804 self.start_publishing_timer();
806 self.keep_alive_counter -= 1;
807 return UpdateStateResult::new(
808 HandledState::KeepAlive16,
809 UpdateStateAction::None,
810 );
811 } else if p.publishing_timer_expired
812 && !p.publishing_req_queued
813 && (self.keep_alive_counter == 1
814 || (self.keep_alive_counter > 1
815 && self.publishing_enabled
816 && p.notifications_available))
817 {
818 self.start_publishing_timer();
820 self.state = SubscriptionState::Late;
821 return UpdateStateResult::new(
822 HandledState::KeepAlive17,
823 UpdateStateAction::None,
824 );
825 }
826 }
827 _ => {
828 }
830 }
831
832 UpdateStateResult::new(HandledState::None0, UpdateStateAction::None)
833 }
834
835 fn tick_monitored_items(
843 &mut self,
844 now: &DateTimeUtc,
845 address_space: &AddressSpace,
846 publishing_interval_elapsed: bool,
847 resend_data: bool,
848 ) -> Option<NotificationMessage> {
849 let mut triggered_items: BTreeSet<u32> = BTreeSet::new();
850 let mut monitored_item_notifications = Vec::with_capacity(self.monitored_items.len() * 2);
851
852 for monitored_item in self.monitored_items.values_mut() {
853 let monitoring_mode = monitored_item.monitoring_mode();
855 match monitored_item.tick(now, address_space, publishing_interval_elapsed, resend_data)
856 {
857 TickResult::ReportValueChanged => {
858 if publishing_interval_elapsed {
859 match monitoring_mode {
861 MonitoringMode::Reporting => {
862 monitored_item.triggered_items().iter().for_each(|i| {
866 triggered_items.insert(*i);
867 })
868 }
869 _ => {
870 panic!("How can there be changes to report when monitored item is in this monitoring mode {:?}", monitoring_mode);
872 }
873 }
874 if let Some(mut item_notification_messages) =
876 monitored_item.all_notifications()
877 {
878 monitored_item_notifications.append(&mut item_notification_messages);
879 }
880 }
881 }
882 TickResult::ValueChanged => {
883 if publishing_interval_elapsed {
886 match monitoring_mode {
887 MonitoringMode::Sampling => {
888 monitored_item.triggered_items().iter().for_each(|i| {
891 triggered_items.insert(*i);
892 })
893 }
894 _ => {
895 panic!("How can there be a value change when the mode is not sampling?");
897 }
898 }
899 }
900 }
901 TickResult::NoChange => {
902 }
904 }
905 }
906
907 triggered_items.iter().for_each(|i| {
909 if let Some(ref mut monitored_item) = self.monitored_items.get_mut(i) {
910 match monitored_item.monitoring_mode() {
912 MonitoringMode::Sampling => {
913 monitored_item.check_value(address_space, now, true);
918 if let Some(mut notifications) = monitored_item.all_notifications() {
919 monitored_item_notifications.append(&mut notifications);
920 }
921 }
922 MonitoringMode::Reporting => {
923 }
929 MonitoringMode::Disabled => {
930 }
932 }
933 } else {
934 }
937 });
938
939 if !monitored_item_notifications.is_empty() {
941 let next_sequence_number = self.sequence_number.next();
942
943 trace!(
944 "Create notification for subscription {}, sequence number {}",
945 self.subscription_id,
946 next_sequence_number
947 );
948
949 let data_change_notifications = monitored_item_notifications
951 .iter()
952 .filter(|v| matches!(v, Notification::MonitoredItemNotification(_)))
953 .map(|v| {
954 if let Notification::MonitoredItemNotification(v) = v {
955 v.clone()
956 } else {
957 panic!()
958 }
959 })
960 .collect();
961
962 let event_notifications = monitored_item_notifications
964 .iter()
965 .filter(|v| matches!(v, Notification::Event(_)))
966 .map(|v| {
967 if let Notification::Event(v) = v {
968 v.clone()
969 } else {
970 panic!()
971 }
972 })
973 .collect();
974
975 let notification = NotificationMessage::data_change(
977 next_sequence_number,
978 DateTime::from(*now),
979 data_change_notifications,
980 event_notifications,
981 );
982 Some(notification)
983 } else {
984 None
985 }
986 }
987
988 pub fn reset_keep_alive_counter(&mut self) {
992 self.keep_alive_counter = self.max_keep_alive_counter;
993 }
994
995 pub fn reset_lifetime_counter(&mut self) {
998 self.lifetime_counter = self.max_lifetime_counter;
999 }
1000
1001 pub fn start_publishing_timer(&mut self) {
1003 self.lifetime_counter -= 1;
1004 trace!("Decrementing life time counter {}", self.lifetime_counter);
1005 }
1006
1007 pub fn subscription_id(&self) -> u32 {
1008 self.subscription_id
1009 }
1010
1011 pub fn lifetime_counter(&self) -> u32 {
1012 self.lifetime_counter
1013 }
1014
1015 #[cfg(test)]
1016 pub(crate) fn set_current_lifetime_count(&mut self, current_lifetime_count: u32) {
1017 self.lifetime_counter = current_lifetime_count;
1018 }
1019
1020 pub fn keep_alive_counter(&self) -> u32 {
1021 self.keep_alive_counter
1022 }
1023
1024 #[cfg(test)]
1025 pub(crate) fn set_keep_alive_counter(&mut self, keep_alive_counter: u32) {
1026 self.keep_alive_counter = keep_alive_counter;
1027 }
1028
1029 #[cfg(test)]
1030 pub(crate) fn state(&self) -> SubscriptionState {
1031 self.state
1032 }
1033
1034 #[cfg(test)]
1035 pub(crate) fn set_state(&mut self, state: SubscriptionState) {
1036 self.state = state;
1037 }
1038
1039 pub fn message_sent(&self) -> bool {
1040 self.first_message_sent
1041 }
1042
1043 #[cfg(test)]
1044 pub(crate) fn set_message_sent(&mut self, message_sent: bool) {
1045 self.first_message_sent = message_sent;
1046 }
1047
1048 pub fn publishing_interval(&self) -> Duration {
1049 self.publishing_interval
1050 }
1051
1052 pub(crate) fn set_publishing_interval(&mut self, publishing_interval: Duration) {
1053 self.publishing_interval = publishing_interval;
1054 self.reset_lifetime_counter();
1055 }
1056
1057 pub fn max_keep_alive_count(&self) -> u32 {
1058 self.max_keep_alive_counter
1059 }
1060
1061 pub(crate) fn set_max_keep_alive_count(&mut self, max_keep_alive_count: u32) {
1062 self.max_keep_alive_counter = max_keep_alive_count;
1063 }
1064
1065 pub fn max_lifetime_count(&self) -> u32 {
1066 self.max_lifetime_counter
1067 }
1068
1069 pub(crate) fn set_max_lifetime_count(&mut self, max_lifetime_count: u32) {
1070 self.max_lifetime_counter = max_lifetime_count;
1071 }
1072
1073 pub fn priority(&self) -> u8 {
1074 self.priority
1075 }
1076
1077 pub(crate) fn set_priority(&mut self, priority: u8) {
1078 self.priority = priority;
1079 }
1080
1081 pub(crate) fn set_publishing_enabled(&mut self, publishing_enabled: bool) {
1082 self.publishing_enabled = publishing_enabled;
1083 self.reset_lifetime_counter();
1084 }
1085
1086 pub(crate) fn set_diagnostics_on_drop(&mut self, diagnostics_on_drop: bool) {
1087 self.diagnostics_on_drop = diagnostics_on_drop;
1088 }
1089
1090 fn validate_triggered_items(
1091 &self,
1092 monitored_item_id: u32,
1093 items: &[u32],
1094 ) -> (Vec<StatusCode>, Vec<u32>) {
1095 let is_good_monitored_item =
1097 |i| self.monitored_items.contains_key(i) && *i != monitored_item_id;
1098 let is_good_monitored_item_result = |i| {
1099 if is_good_monitored_item(i) {
1100 StatusCode::Good
1101 } else {
1102 StatusCode::BadMonitoredItemIdInvalid
1103 }
1104 };
1105
1106 let results: Vec<StatusCode> = items.iter().map(is_good_monitored_item_result).collect();
1108 let items: Vec<u32> = items
1109 .iter()
1110 .filter(|i| is_good_monitored_item(i))
1111 .copied()
1112 .collect();
1113
1114 (results, items)
1115 }
1116
1117 pub(crate) fn set_triggering(
1121 &mut self,
1122 monitored_item_id: u32,
1123 items_to_add: &[u32],
1124 items_to_remove: &[u32],
1125 ) -> Result<(Vec<StatusCode>, Vec<StatusCode>), StatusCode> {
1126 let (add_results, items_to_add) =
1128 self.validate_triggered_items(monitored_item_id, items_to_add);
1129 let (remove_results, items_to_remove) =
1130 self.validate_triggered_items(monitored_item_id, items_to_remove);
1131
1132 if let Some(ref mut monitored_item) = self.monitored_items.get_mut(&monitored_item_id) {
1133 monitored_item.set_triggering(items_to_add.as_slice(), items_to_remove.as_slice());
1135
1136 Ok((add_results, remove_results))
1137 } else {
1138 Err(StatusCode::BadMonitoredItemIdInvalid)
1140 }
1141 }
1142}