1use std::{
2 collections::{HashMap, HashSet, VecDeque},
3 time::{Duration, Instant},
4};
5
6use opcua_core::handle::Handle;
7use opcua_nodes::{Event, TypeTree};
8use opcua_types::{DataValue, DateTime, DateTimeUtc, NotificationMessage, StatusCode};
9use tracing::{debug, trace, warn};
10
11use super::monitored_item::{MonitoredItem, Notification};
12
13#[derive(Debug, Copy, Clone, PartialEq)]
14pub enum SubscriptionState {
16 Closed,
18 Creating,
20 Normal,
22 Late,
25 KeepAlive,
28}
29
30#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
31pub struct MonitoredItemHandle {
33 pub subscription_id: u32,
35 pub monitored_item_id: u32,
37}
38
39#[derive(Debug)]
40pub(crate) struct SubscriptionStateParams {
41 pub notifications_available: bool,
42 pub more_notifications: bool,
43 pub publishing_req_queued: bool,
44}
45
46#[derive(Debug, Copy, Clone, PartialEq, Eq)]
47pub(crate) enum UpdateStateAction {
48 None,
49 ReturnKeepAlive,
51 ReturnNotifications,
53 SubscriptionCreated,
55 SubscriptionExpired,
57}
58
59#[derive(Debug, Copy, Clone, PartialEq, Eq)]
60pub(super) enum TickResult {
61 Expired,
62 Enqueued,
63 None,
64}
65
66#[derive(Debug, Copy, Clone, PartialEq)]
71pub(crate) enum HandledState {
72 None0 = 0,
73 Create3 = 3,
74 Normal4 = 4,
75 Normal5 = 5,
76 IntervalElapsed6 = 6,
77 IntervalElapsed7 = 7,
78 IntervalElapsed8 = 8,
79 IntervalElapsed9 = 9,
80 Late10 = 10,
81 Late11 = 11,
82 Late12 = 12,
83 KeepAlive13 = 13,
84 KeepAlive14 = 14,
85 KeepAlive15 = 15,
86 KeepAlive16 = 16,
87 KeepAlive17 = 17,
88 Closed27 = 27,
89}
90
91#[derive(Debug)]
92pub struct Subscription {
94 id: u32,
95 publishing_interval: Duration,
96 max_lifetime_counter: u32,
97 max_keep_alive_counter: u32,
98 priority: u8,
99 monitored_items: HashMap<u32, MonitoredItem>,
100 notified_monitored_items: HashSet<u32>,
102 state: SubscriptionState,
104 lifetime_counter: u32,
107 keep_alive_counter: u32,
110 first_message_sent: bool,
115 publishing_enabled: bool,
117 resend_data: bool,
120 sequence_number: Handle,
122 last_time_publishing_interval_elapsed: Instant,
124 notifications: VecDeque<NotificationMessage>,
126 max_queued_notifications: usize,
128 max_notifications_per_publish: usize,
130}
131
132#[derive(Debug, Copy, Clone, PartialEq)]
133pub(crate) enum TickReason {
134 ReceivePublishRequest,
135 TickTimerFired,
136}
137
138impl Subscription {
139 #[allow(clippy::too_many_arguments)]
140 pub(super) fn new(
141 id: u32,
142 publishing_enabled: bool,
143 publishing_interval: Duration,
144 lifetime_counter: u32,
145 keep_alive_counter: u32,
146 priority: u8,
147 max_queued_notifications: usize,
148 max_notifications_per_publish: u64,
149 ) -> Self {
150 Self {
151 id,
152 publishing_interval,
153 max_lifetime_counter: lifetime_counter,
154 max_keep_alive_counter: keep_alive_counter,
155 priority,
156 monitored_items: HashMap::new(),
157 notified_monitored_items: HashSet::new(),
158 state: SubscriptionState::Creating,
160 lifetime_counter,
161 keep_alive_counter,
162 first_message_sent: false,
163 resend_data: false,
164 publishing_enabled,
165 sequence_number: Handle::new(1),
167 last_time_publishing_interval_elapsed: Instant::now(),
168 notifications: VecDeque::new(),
169 max_queued_notifications,
170 max_notifications_per_publish: max_notifications_per_publish as usize,
171 }
172 }
173
174 pub fn len(&self) -> usize {
176 self.monitored_items.len()
177 }
178
179 pub fn is_empty(&self) -> bool {
181 self.monitored_items.is_empty()
182 }
183
184 pub(super) fn get_mut(&mut self, id: &u32) -> Option<&mut MonitoredItem> {
185 self.monitored_items.get_mut(id)
186 }
187
188 pub fn get(&self, id: &u32) -> Option<&MonitoredItem> {
190 self.monitored_items.get(id)
191 }
192
193 pub fn contains_key(&self, id: &u32) -> bool {
195 self.monitored_items.contains_key(id)
196 }
197
198 pub fn items(&self) -> impl Iterator<Item = &MonitoredItem> {
200 self.monitored_items.values()
201 }
202
203 pub(super) fn drain(&mut self) -> impl Iterator<Item = (u32, MonitoredItem)> + '_ {
204 self.monitored_items.drain()
205 }
206
207 pub fn set_resend_data(&mut self) {
210 self.resend_data = true;
211 }
212
213 pub(super) fn remove(&mut self, id: &u32) -> Option<MonitoredItem> {
214 self.monitored_items.remove(id)
215 }
216
217 pub(super) fn insert(&mut self, id: u32, item: MonitoredItem) {
218 self.monitored_items.insert(id, item);
219 self.notified_monitored_items.insert(id);
220 }
221
222 pub fn notify_data_value(&mut self, id: &u32, value: DataValue, now: &DateTime) {
224 if let Some(item) = self.monitored_items.get_mut(id) {
225 if item.notify_data_value(value, now, false) {
226 self.notified_monitored_items.insert(*id);
227 }
228 }
229 }
230
231 pub fn notify_event(&mut self, id: &u32, event: &dyn Event, type_tree: &dyn TypeTree) {
233 if let Some(item) = self.monitored_items.get_mut(id) {
234 if item.notify_event(event, type_tree) {
235 self.notified_monitored_items.insert(*id);
236 }
237 }
238 }
239
240 fn test_and_set_publishing_interval_elapsed(&mut self, now: Instant) -> bool {
243 let elapsed = now - self.last_time_publishing_interval_elapsed;
246 if elapsed >= self.publishing_interval {
247 self.last_time_publishing_interval_elapsed = now;
248 true
249 } else {
250 false
251 }
252 }
253
254 fn get_state_transition(
255 &self,
256 tick_reason: TickReason,
257 p: SubscriptionStateParams,
258 ) -> HandledState {
259 #[allow(clippy::nonminimal_bool)]
264 match (self.state, tick_reason) {
265 (SubscriptionState::Creating, _) => HandledState::Create3,
266 (SubscriptionState::Normal, TickReason::ReceivePublishRequest)
267 if self.publishing_enabled || !self.publishing_enabled && !p.more_notifications =>
268 {
269 HandledState::Normal4
270 }
271 (SubscriptionState::Normal, TickReason::ReceivePublishRequest)
272 if self.publishing_enabled && p.more_notifications =>
273 {
274 HandledState::Normal5
275 }
276 (SubscriptionState::Normal, TickReason::TickTimerFired)
277 if p.publishing_req_queued
278 && self.publishing_enabled
279 && p.notifications_available =>
280 {
281 HandledState::IntervalElapsed6
282 }
283 (SubscriptionState::Normal, TickReason::TickTimerFired)
284 if p.publishing_req_queued
285 && !self.first_message_sent
286 && (!self.publishing_enabled
287 || self.publishing_enabled && !p.more_notifications) =>
288 {
289 HandledState::IntervalElapsed7
290 }
291 (SubscriptionState::Normal, TickReason::TickTimerFired)
292 if !p.publishing_req_queued
293 && (!self.first_message_sent
294 || self.publishing_enabled && p.notifications_available) =>
295 {
296 HandledState::IntervalElapsed8
297 }
298 (SubscriptionState::Normal, TickReason::TickTimerFired)
299 if self.first_message_sent
300 && (!self.publishing_enabled
301 || self.publishing_enabled && !p.more_notifications) =>
302 {
303 HandledState::IntervalElapsed9
304 }
305 (SubscriptionState::Late, TickReason::ReceivePublishRequest)
306 if self.publishing_enabled
307 && (p.notifications_available || p.more_notifications) =>
308 {
309 HandledState::Late10
310 }
311 (SubscriptionState::Late, TickReason::ReceivePublishRequest)
312 if !self.publishing_enabled
313 || self.publishing_enabled
314 && !p.notifications_available
315 && !p.more_notifications =>
316 {
317 HandledState::Late11
318 }
319 (SubscriptionState::Late, TickReason::TickTimerFired) if self.lifetime_counter > 1 => {
322 HandledState::Late12
323 }
324 (SubscriptionState::KeepAlive, TickReason::ReceivePublishRequest) => {
325 HandledState::KeepAlive13
326 }
327 (SubscriptionState::KeepAlive, TickReason::TickTimerFired)
328 if self.publishing_enabled
329 && p.notifications_available
330 && p.publishing_req_queued =>
331 {
332 HandledState::KeepAlive14
333 }
334 (SubscriptionState::KeepAlive, TickReason::TickTimerFired)
335 if p.publishing_req_queued
336 && self.keep_alive_counter == 1
337 && (!self.publishing_enabled
338 || self.publishing_enabled && !p.notifications_available) =>
339 {
340 HandledState::KeepAlive15
341 }
342 (SubscriptionState::KeepAlive, TickReason::TickTimerFired)
343 if self.keep_alive_counter > 1
344 && (!self.publishing_enabled
345 || self.publishing_enabled && !p.notifications_available) =>
346 {
347 HandledState::KeepAlive16
348 }
349 (SubscriptionState::KeepAlive, TickReason::TickTimerFired)
350 if !p.publishing_req_queued
351 && (self.keep_alive_counter == 1
352 || self.keep_alive_counter > 1
353 && self.publishing_enabled
354 && p.notifications_available) =>
355 {
356 HandledState::KeepAlive17
357 }
358 (
360 SubscriptionState::Normal | SubscriptionState::Late | SubscriptionState::KeepAlive,
361 TickReason::TickTimerFired,
362 ) if self.lifetime_counter <= 1 => HandledState::Closed27,
363 _ => HandledState::None0,
364 }
365 }
366
367 fn handle_state_transition(&mut self, transition: HandledState) -> UpdateStateAction {
368 match transition {
369 HandledState::None0 => UpdateStateAction::None,
370 HandledState::Create3 => {
371 self.state = SubscriptionState::Normal;
372 self.first_message_sent = false;
373 UpdateStateAction::SubscriptionCreated
374 }
375 HandledState::Normal4 => {
376 UpdateStateAction::None
378 }
379 HandledState::Normal5 => {
380 self.reset_lifetime_counter();
381 UpdateStateAction::ReturnNotifications
382 }
383 HandledState::IntervalElapsed6 => {
384 self.reset_lifetime_counter();
385 self.start_publishing_timer();
386 self.first_message_sent = true;
387 UpdateStateAction::ReturnNotifications
388 }
389 HandledState::IntervalElapsed7 => {
390 self.reset_lifetime_counter();
391 self.start_publishing_timer();
392 self.first_message_sent = true;
393 UpdateStateAction::ReturnKeepAlive
394 }
395 HandledState::IntervalElapsed8 => {
396 self.start_publishing_timer();
397 self.state = SubscriptionState::Late;
398 UpdateStateAction::None
399 }
400 HandledState::IntervalElapsed9 => {
401 self.start_publishing_timer();
402 self.reset_keep_alive_counter();
403 self.state = SubscriptionState::KeepAlive;
404 UpdateStateAction::None
405 }
406 HandledState::Late10 => {
407 self.reset_lifetime_counter();
408 self.first_message_sent = true;
409 self.state = SubscriptionState::Normal;
410 UpdateStateAction::ReturnNotifications
411 }
412 HandledState::Late11 => {
413 self.reset_lifetime_counter();
414 self.first_message_sent = true;
415 self.state = SubscriptionState::KeepAlive;
416 UpdateStateAction::ReturnKeepAlive
417 }
418 HandledState::Late12 => {
419 self.start_publishing_timer();
420 self.state = SubscriptionState::Late;
421 UpdateStateAction::None
422 }
423 HandledState::KeepAlive13 => {
424 UpdateStateAction::None
426 }
427 HandledState::KeepAlive14 => {
428 self.reset_lifetime_counter();
429 self.start_publishing_timer();
430 self.first_message_sent = true;
431 self.state = SubscriptionState::Normal;
432 UpdateStateAction::ReturnNotifications
433 }
434 HandledState::KeepAlive15 => {
435 self.start_publishing_timer();
436 self.reset_keep_alive_counter();
437 UpdateStateAction::ReturnKeepAlive
438 }
439 HandledState::KeepAlive16 => {
440 self.start_publishing_timer();
441 self.keep_alive_counter -= 1;
442 UpdateStateAction::None
443 }
444 HandledState::KeepAlive17 => {
445 self.start_publishing_timer();
446 self.state = SubscriptionState::Late;
447 UpdateStateAction::None
448 }
449 HandledState::Closed27 => {
450 self.state = SubscriptionState::Closed;
451 UpdateStateAction::SubscriptionExpired
452 }
453 }
454 }
455
456 fn notifications_available(&self, resend_data: bool) -> bool {
457 if !self.notified_monitored_items.is_empty() {
458 true
459 } else if resend_data {
460 self.monitored_items.iter().any(|it| it.1.has_last_value())
461 } else {
462 false
463 }
464 }
465
466 pub(super) fn tick(
467 &mut self,
468 now: &DateTimeUtc,
469 now_instant: Instant,
470 tick_reason: TickReason,
471 publishing_req_queued: bool,
472 ) -> TickResult {
473 let publishing_interval_elapsed = match tick_reason {
474 TickReason::ReceivePublishRequest => false,
475 TickReason::TickTimerFired => {
476 if self.state == SubscriptionState::Creating {
477 true
478 } else {
479 self.test_and_set_publishing_interval_elapsed(now_instant)
480 }
481 }
482 };
483
484 if matches!(tick_reason, TickReason::TickTimerFired) && !publishing_interval_elapsed {
486 return TickResult::None;
487 }
488 let transition = self.get_state_transition(
490 tick_reason,
491 SubscriptionStateParams {
492 notifications_available: self.notifications_available(self.resend_data),
493 more_notifications: !self.notifications.is_empty(),
494 publishing_req_queued,
495 },
496 );
497 let action = self.handle_state_transition(transition);
498
499 match action {
500 UpdateStateAction::None => TickResult::None,
501 UpdateStateAction::ReturnKeepAlive => {
502 let notification = NotificationMessage::keep_alive(
503 self.sequence_number.peek_next(),
508 DateTime::from(*now),
509 );
510 self.enqueue_notification(notification);
511 TickResult::Enqueued
512 }
513 UpdateStateAction::ReturnNotifications => {
514 let resend_data = std::mem::take(&mut self.resend_data);
515 let messages = self.tick_monitored_items(now, resend_data);
516 for msg in messages {
517 self.enqueue_notification(msg);
518 }
519 TickResult::Enqueued
520 }
521 UpdateStateAction::SubscriptionCreated => TickResult::None,
522 UpdateStateAction::SubscriptionExpired => {
523 debug!("Subscription status change to closed / timeout");
524 self.monitored_items.clear();
525 let notification = NotificationMessage::status_change(
526 self.sequence_number.next(),
527 DateTime::from(*now),
528 StatusCode::BadTimeout,
529 );
530 self.enqueue_notification(notification);
531 TickResult::Expired
532 }
533 }
534 }
535
536 fn enqueue_notification(&mut self, notification: NotificationMessage) {
537 if self.notifications.len() >= self.max_queued_notifications {
538 warn!("Maximum number of queued notifications exceeded, dropping oldest. Subscription ID: {}", self.id);
539 self.notifications.pop_front();
540 }
541
542 self.notifications.push_back(notification);
544 }
545
546 pub(super) fn take_notification(&mut self) -> Option<NotificationMessage> {
547 self.notifications.pop_front()
548 }
549
550 pub(super) fn more_notifications(&self) -> bool {
551 !self.notifications.is_empty()
552 }
553
554 pub(super) fn ready_to_remove(&self) -> bool {
555 self.state == SubscriptionState::Closed && self.notifications.is_empty()
556 }
557
558 fn handle_triggers(
559 &mut self,
560 now: &DateTimeUtc,
561 triggers: Vec<(u32, u32)>,
562 notifications: &mut Vec<Notification>,
563 messages: &mut Vec<NotificationMessage>,
564 ) {
565 for (triggering_item, item_id) in triggers {
566 let Some(item) = self.monitored_items.get_mut(&item_id) else {
567 if let Some(item) = self.monitored_items.get_mut(&triggering_item) {
568 item.remove_dead_trigger(item_id);
569 }
570 continue;
571 };
572
573 while let Some(notif) = item.pop_notification() {
574 notifications.push(notif);
575 if notifications.len() >= self.max_notifications_per_publish
576 && self.max_notifications_per_publish > 0
577 {
578 messages.push(Self::make_notification_message(
579 self.sequence_number.next(),
580 std::mem::take(notifications),
581 now,
582 ));
583 }
584 }
585 }
586 }
587
588 fn make_notification_message(
589 next_sequence_number: u32,
590 notifications: Vec<Notification>,
591 now: &DateTimeUtc,
592 ) -> NotificationMessage {
593 let mut data_change_notifications = Vec::new();
594 let mut event_notifications = Vec::new();
595
596 for notif in notifications {
597 match notif {
598 Notification::MonitoredItemNotification(n) => data_change_notifications.push(n),
599 Notification::Event(n) => event_notifications.push(n),
600 }
601 }
602
603 NotificationMessage::data_change(
604 next_sequence_number,
605 DateTime::from(*now),
606 data_change_notifications,
607 event_notifications,
608 )
609 }
610
611 #[allow(clippy::too_many_arguments)]
612 fn tick_monitored_item(
613 monitored_item: &mut MonitoredItem,
614 now: &DateTimeUtc,
615 resend_data: bool,
616 max_notifications: usize,
617 triggers: &mut Vec<(u32, u32)>,
618 notifications: &mut Vec<Notification>,
619 messages: &mut Vec<NotificationMessage>,
620 sequence_numbers: &mut Handle,
621 ) {
622 monitored_item.maybe_enqueue_skipped_value(&(*now).into());
623
624 if monitored_item.is_sampling() && monitored_item.has_new_notifications() {
625 triggers.extend(
626 monitored_item
627 .triggered_items()
628 .iter()
629 .copied()
630 .map(|id| (monitored_item.id(), id)),
631 );
632 }
633
634 if monitored_item.is_reporting() {
635 if resend_data {
636 monitored_item.add_current_value_to_queue();
637 }
638 if monitored_item.has_notifications() {
639 while let Some(notif) = monitored_item.pop_notification() {
640 notifications.push(notif);
641 if notifications.len() >= max_notifications && max_notifications > 0 {
642 messages.push(Self::make_notification_message(
643 sequence_numbers.next(),
644 std::mem::take(notifications),
645 now,
646 ));
647 }
648 }
649 }
650 }
651 }
652
653 fn tick_monitored_items(
654 &mut self,
655 now: &DateTimeUtc,
656 resend_data: bool,
657 ) -> Vec<NotificationMessage> {
658 let mut notifications = Vec::new();
659 let mut messages = Vec::new();
660 let mut triggers = Vec::new();
661
662 if resend_data {
664 for monitored_item in self.monitored_items.values_mut() {
665 Self::tick_monitored_item(
666 monitored_item,
667 now,
668 resend_data,
669 self.max_notifications_per_publish,
670 &mut triggers,
671 &mut notifications,
672 &mut messages,
673 &mut self.sequence_number,
674 );
675 }
676 } else {
677 for item_id in self.notified_monitored_items.drain() {
678 let Some(monitored_item) = self.monitored_items.get_mut(&item_id) else {
679 continue;
680 };
681 Self::tick_monitored_item(
682 monitored_item,
683 now,
684 resend_data,
685 self.max_notifications_per_publish,
686 &mut triggers,
687 &mut notifications,
688 &mut messages,
689 &mut self.sequence_number,
690 );
691 }
692 }
693
694 self.handle_triggers(now, triggers, &mut notifications, &mut messages);
695
696 if !notifications.is_empty() {
697 messages.push(Self::make_notification_message(
698 self.sequence_number.next(),
699 notifications,
700 now,
701 ));
702 }
703
704 messages
705 }
706
707 pub(super) fn reset_keep_alive_counter(&mut self) {
711 self.keep_alive_counter = self.max_keep_alive_counter;
712 }
713
714 pub(super) fn reset_lifetime_counter(&mut self) {
717 self.lifetime_counter = self.max_lifetime_counter;
718 }
719
720 pub(super) fn start_publishing_timer(&mut self) {
722 self.lifetime_counter = self.lifetime_counter.saturating_sub(1);
723 trace!("Decrementing life time counter {}", self.lifetime_counter);
724 }
725
726 pub fn id(&self) -> u32 {
728 self.id
729 }
730
731 pub fn priority(&self) -> u8 {
733 self.priority
734 }
735
736 pub(super) fn set_publishing_interval(&mut self, publishing_interval: Duration) {
737 self.publishing_interval = publishing_interval;
738 self.reset_lifetime_counter();
739 }
740
741 pub(super) fn set_max_lifetime_counter(&mut self, max_lifetime_counter: u32) {
742 self.max_lifetime_counter = max_lifetime_counter;
743 }
744
745 pub(super) fn set_max_keep_alive_counter(&mut self, max_keep_alive_counter: u32) {
746 self.max_keep_alive_counter = max_keep_alive_counter;
747 }
748
749 pub(super) fn set_priority(&mut self, priority: u8) {
750 self.priority = priority;
751 }
752
753 pub(super) fn set_max_notifications_per_publish(&mut self, max_notifications_per_publish: u64) {
754 self.max_notifications_per_publish = max_notifications_per_publish as usize;
755 }
756
757 pub(super) fn set_publishing_enabled(&mut self, publishing_enabled: bool) {
758 self.publishing_enabled = publishing_enabled;
759 }
760
761 pub fn publishing_interval(&self) -> Duration {
763 self.publishing_interval
764 }
765
766 pub fn publishing_enabled(&self) -> bool {
768 self.publishing_enabled
769 }
770
771 pub fn max_queued_notifications(&self) -> usize {
773 self.max_queued_notifications
774 }
775
776 pub fn max_notifications_per_publish(&self) -> usize {
779 self.max_notifications_per_publish
780 }
781
782 pub fn state(&self) -> SubscriptionState {
784 self.state
785 }
786}
787
788#[cfg(test)]
789mod tests {
790 use std::time::{Duration, Instant};
791
792 use chrono::{TimeDelta, Utc};
793
794 use crate::{
795 subscriptions::monitored_item::{
796 tests::new_monitored_item, FilterType, Notification, SamplingInterval,
797 },
798 SubscriptionState,
799 };
800 use opcua_types::{
801 match_extension_object_owned, AttributeId, DataChangeNotification, DataValue, DateTime,
802 DateTimeUtc, EventNotificationList, MonitoringMode, NodeId, NotificationMessage,
803 ReadValueId, StatusChangeNotification, StatusCode, Variant,
804 };
805
806 use super::{Subscription, TickReason};
807
808 fn get_notifications(message: &NotificationMessage) -> Vec<Notification> {
809 let mut res = Vec::new();
810 for it in message.notification_data.iter().flatten() {
811 let it = it.clone();
812 match_extension_object_owned!(it,
813 notif: DataChangeNotification => {
814 for n in notif.monitored_items.into_iter().flatten() {
815 res.push(Notification::MonitoredItemNotification(n));
816 }
817 },
818 notif: EventNotificationList => {
819 for n in notif.events.into_iter().flatten() {
820 res.push(Notification::Event(n));
821 }
822 },
823 _ => panic!("Wrong message type"),
824 )
825 }
826 res
827 }
828
829 fn offset(time: DateTimeUtc, time_inst: Instant, ms: u64) -> (DateTimeUtc, Instant) {
830 (
831 time + chrono::Duration::try_milliseconds(ms as i64).unwrap(),
832 time_inst + Duration::from_millis(ms),
833 )
834 }
835
836 #[test]
837 fn tick() {
838 let mut sub = Subscription::new(1, true, Duration::from_millis(100), 100, 20, 1, 100, 1000);
839 let start = Instant::now();
840 let start_dt = Utc::now();
841
842 sub.last_time_publishing_interval_elapsed = start;
843
844 assert_eq!(sub.state, SubscriptionState::Creating);
846 sub.tick(&start_dt, start, TickReason::TickTimerFired, true);
847 assert_eq!(sub.state, SubscriptionState::Normal);
848 assert!(!sub.first_message_sent);
849
850 sub.tick(&start_dt, start, TickReason::TickTimerFired, true);
852 assert_eq!(sub.state, SubscriptionState::Normal);
853 assert!(!sub.first_message_sent);
854
855 sub.insert(
857 1,
858 new_monitored_item(
859 1,
860 ReadValueId {
861 node_id: NodeId::null(),
862 attribute_id: AttributeId::Value as u32,
863 ..Default::default()
864 },
865 MonitoringMode::Reporting,
866 FilterType::None,
867 SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
868 false,
869 Some(DataValue::new_now(123)),
870 ),
871 );
872 let (time, time_inst) = offset(start_dt, start, 100);
874 sub.tick(&time, time_inst, TickReason::TickTimerFired, true);
875 assert_eq!(sub.state, SubscriptionState::Normal);
876 assert!(sub.first_message_sent);
877 let notif = sub.take_notification().unwrap();
878 let its = get_notifications(¬if);
879 assert_eq!(its.len(), 1);
880 assert_eq!(notif.sequence_number, 1);
881 let Notification::MonitoredItemNotification(m) = &its[0] else {
882 panic!("Wrong notification type");
883 };
884 assert_eq!(m.value.value, Some(Variant::Int32(123)));
885
886 let (time, time_inst) = offset(start_dt, start, 200);
888
889 sub.tick(&time, time_inst, TickReason::TickTimerFired, true);
890 assert_eq!(sub.state, SubscriptionState::KeepAlive);
892 assert_eq!(sub.lifetime_counter, 98);
893 assert!(sub.first_message_sent);
894 assert!(sub.take_notification().is_none());
895
896 sub.notify_data_value(
898 &1,
899 DataValue::new_at(
900 321,
901 DateTime::from(start_dt + chrono::Duration::try_milliseconds(300).unwrap()),
902 ),
903 &DateTime::now(),
904 );
905 let (time, time_inst) = offset(start_dt, start, 300);
906 sub.tick(&time, time_inst, TickReason::TickTimerFired, true);
907 assert_eq!(sub.state, SubscriptionState::Normal);
909 assert!(sub.first_message_sent);
910 assert_eq!(sub.lifetime_counter, 99);
911 let notif = sub.take_notification().unwrap();
912 let its = get_notifications(¬if);
913 assert_eq!(notif.sequence_number, 2);
914 assert_eq!(its.len(), 1);
915 let Notification::MonitoredItemNotification(m) = &its[0] else {
916 panic!("Wrong notification type");
917 };
918 assert_eq!(m.value.value, Some(Variant::Int32(321)));
919
920 for i in 0..20 {
921 let (time, time_inst) = offset(start_dt, start, 1000 + i * 100);
922 sub.tick(&time, time_inst, TickReason::TickTimerFired, true);
923 assert_eq!(sub.state, SubscriptionState::KeepAlive);
924 assert_eq!(sub.lifetime_counter, (99 - i - 1) as u32);
925 assert_eq!(sub.keep_alive_counter, (20 - i) as u32);
926 assert!(sub.take_notification().is_none());
927 }
928 assert_eq!(sub.lifetime_counter, 79);
929 assert_eq!(sub.keep_alive_counter, 1);
930
931 let (time, time_inst) = offset(start_dt, start, 3000);
933 sub.tick(&time, time_inst, TickReason::TickTimerFired, true);
934 assert_eq!(sub.state, SubscriptionState::KeepAlive);
935 assert_eq!(sub.lifetime_counter, 78);
936 assert_eq!(sub.keep_alive_counter, 20);
937 let notif = sub.take_notification().unwrap();
938 let its = get_notifications(¬if);
939 assert!(its.is_empty());
940 assert_eq!(notif.sequence_number, 3);
942
943 for i in 0..19 {
945 let (time, time_inst) = offset(start_dt, start, 3100 + i * 100);
946 sub.tick(&time, time_inst, TickReason::TickTimerFired, false);
947 assert_eq!(sub.state, SubscriptionState::KeepAlive);
948 assert_eq!(sub.lifetime_counter, (78 - i - 1) as u32);
949 }
950
951 for i in 0..58 {
953 let (time, time_inst) = offset(start_dt, start, 5100 + i * 100);
954 sub.tick(&time, time_inst, TickReason::TickTimerFired, false);
955 assert_eq!(sub.state, SubscriptionState::Late);
956 assert_eq!(sub.lifetime_counter, (58 - i) as u32);
957 }
958 assert_eq!(sub.lifetime_counter, 1);
959
960 let (time, time_inst) = offset(start_dt, start, 20000);
961 sub.tick(&time, time_inst, TickReason::TickTimerFired, false);
962 assert_eq!(sub.state, SubscriptionState::Closed);
963 let notif = sub.take_notification().unwrap();
964 assert_eq!(notif.sequence_number, 3);
965 assert_eq!(1, notif.notification_data.as_ref().unwrap().len());
966 let status_change = notif.notification_data.as_ref().unwrap()[0]
967 .inner_as::<StatusChangeNotification>()
968 .unwrap();
969 assert_eq!(status_change.status, StatusCode::BadTimeout);
970 }
971
972 #[test]
973 fn monitored_item_triggers() {
974 let mut sub = Subscription::new(1, true, Duration::from_millis(100), 100, 20, 1, 100, 1000);
975 let start = Instant::now();
976 let start_dt = Utc::now();
977
978 sub.last_time_publishing_interval_elapsed = start;
979 for i in 0..4 {
980 sub.insert(
981 i + 1,
982 new_monitored_item(
983 i + 1,
984 ReadValueId {
985 node_id: NodeId::null(),
986 attribute_id: AttributeId::Value as u32,
987 ..Default::default()
988 },
989 if i == 0 {
990 MonitoringMode::Reporting
991 } else if i == 3 {
992 MonitoringMode::Disabled
993 } else {
994 MonitoringMode::Sampling
995 },
996 FilterType::None,
997 SamplingInterval::NonZero(TimeDelta::milliseconds(100)),
998 false,
999 Some(DataValue::new_at(0, start_dt.into())),
1000 ),
1001 );
1002 }
1003 sub.get_mut(&1).unwrap().set_triggering(&[1, 2, 3, 4], &[]);
1004 let (otime, time_inst) = offset(start_dt, start, 100);
1006 let time = otime.into();
1007 sub.notify_data_value(&2, DataValue::new_at(1, time), &time);
1008 sub.notify_data_value(&3, DataValue::new_at(1, time), &time);
1009 sub.notify_data_value(&4, DataValue::new_at(1, time), &time);
1010
1011 sub.tick(&otime, time_inst, TickReason::TickTimerFired, true);
1013 assert!(sub.take_notification().is_none());
1014
1015 sub.notify_data_value(&1, DataValue::new_at(1, time.into()), &time);
1017 let (time, time_inst) = offset(start_dt, start, 200);
1018 sub.tick(&time, time_inst, TickReason::TickTimerFired, true);
1019 let notif = sub.take_notification().unwrap();
1020 let its = get_notifications(¬if);
1021 assert_eq!(its.len(), 6);
1022 for it in its {
1023 let Notification::MonitoredItemNotification(_m) = it else {
1024 panic!("Wrong notification type");
1025 };
1026 }
1027 }
1028}