1use std::{
2 collections::VecDeque,
3 sync::Arc,
4 time::{Duration, Instant},
5};
6
7use super::{
8 monitored_item::MonitoredItem,
9 subscription::{MonitoredItemHandle, Subscription, TickReason, TickResult},
10 CreateMonitoredItem, NonAckedPublish, PendingPublish, PersistentSessionKey,
11};
12use hashbrown::{HashMap, HashSet};
13use opcua_nodes::{Event, TypeTree};
14
15use crate::{
16 info::ServerInfo,
17 node_manager::{MonitoredItemRef, MonitoredItemUpdateRef, TypeTreeForUserStatic},
18 session::instance::Session,
19 SubscriptionLimits,
20};
21use opcua_core::sync::RwLock;
22use opcua_types::{
23 AttributeId, CreateSubscriptionRequest, CreateSubscriptionResponse, DataValue, DateTime,
24 DateTimeUtc, ExtensionObject, ModifySubscriptionRequest, ModifySubscriptionResponse,
25 MonitoredItemCreateResult, MonitoredItemModifyRequest, MonitoredItemModifyResult,
26 MonitoringMode, NodeId, NotificationMessage, PublishRequest, PublishResponse, RepublishRequest,
27 RepublishResponse, ResponseHeader, ServiceFault, SetPublishingModeRequest,
28 SetPublishingModeResponse, StatusCode, TimestampsToReturn,
29};
30
31pub struct SessionSubscriptions {
34 user_token: PersistentSessionKey,
36 subscriptions: HashMap<u32, Subscription>,
38 publish_request_queue: VecDeque<PendingPublish>,
40 retransmission_queue: VecDeque<NonAckedPublish>,
42 limits: SubscriptionLimits,
44
45 session: Arc<RwLock<Session>>,
47 type_tree_for_user: Arc<dyn TypeTreeForUserStatic>,
49}
50
51impl SessionSubscriptions {
52 pub(super) fn new(
53 limits: SubscriptionLimits,
54 user_token: PersistentSessionKey,
55 session: Arc<RwLock<Session>>,
56 type_tree_for_user: Arc<dyn TypeTreeForUserStatic>,
57 ) -> Self {
58 Self {
59 user_token,
60 subscriptions: HashMap::new(),
61 publish_request_queue: VecDeque::new(),
62 retransmission_queue: VecDeque::new(),
63 limits,
64 session,
65 type_tree_for_user,
66 }
67 }
68
69 fn max_publish_requests(&self) -> usize {
70 self.limits
71 .max_pending_publish_requests
72 .min(self.subscriptions.len() * self.limits.max_publish_requests_per_subscription)
73 .max(1)
74 }
75
76 pub(super) fn is_ready_to_delete(&self) -> bool {
77 self.subscriptions.is_empty() && self.publish_request_queue.is_empty()
78 }
79
80 #[allow(clippy::result_large_err)]
81 pub(super) fn insert(
82 &mut self,
83 subscription: Subscription,
84 notifs: Vec<NonAckedPublish>,
85 ) -> Result<(), (StatusCode, Subscription, Vec<NonAckedPublish>)> {
86 if self.subscriptions.len() >= self.limits.max_subscriptions_per_session {
87 return Err((StatusCode::BadTooManySubscriptions, subscription, notifs));
88 }
89 self.subscriptions.insert(subscription.id(), subscription);
90 for notif in notifs {
91 self.retransmission_queue.push_back(notif);
92 }
93 Ok(())
94 }
95
96 pub fn contains(&self, sub_id: u32) -> bool {
99 self.subscriptions.contains_key(&sub_id)
100 }
101
102 pub fn subscription_ids(&self) -> Vec<u32> {
104 self.subscriptions.keys().copied().collect()
105 }
106
107 pub(super) fn remove(
108 &mut self,
109 subscription_id: u32,
110 ) -> (Option<Subscription>, Vec<NonAckedPublish>) {
111 let mut notifs = Vec::new();
112 let mut idx = 0;
113 while idx < self.retransmission_queue.len() {
114 if self.retransmission_queue[idx].subscription_id == subscription_id {
115 notifs.push(self.retransmission_queue.remove(idx).unwrap());
116 } else {
117 idx += 1;
118 }
119 }
120
121 (self.subscriptions.remove(&subscription_id), notifs)
122 }
123
124 pub fn get_mut(&mut self, subscription_id: u32) -> Option<&mut Subscription> {
126 self.subscriptions.get_mut(&subscription_id)
127 }
128
129 pub fn get(&self, subscription_id: u32) -> Option<&Subscription> {
131 self.subscriptions.get(&subscription_id)
132 }
133
134 pub(super) fn create_subscription(
135 &mut self,
136 request: &CreateSubscriptionRequest,
137 info: &ServerInfo,
138 ) -> Result<CreateSubscriptionResponse, StatusCode> {
139 if self.subscriptions.len() >= self.limits.max_subscriptions_per_session {
140 return Err(StatusCode::BadTooManySubscriptions);
141 }
142 let subscription_id = info.subscription_id_handle.next();
143
144 let (revised_publishing_interval, revised_max_keep_alive_count, revised_lifetime_count) =
145 Self::revise_subscription_values(
146 info,
147 request.requested_publishing_interval,
148 request.requested_max_keep_alive_count,
149 request.requested_lifetime_count,
150 );
151
152 let subscription = Subscription::new(
153 subscription_id,
154 request.publishing_enabled,
155 Duration::from_millis(revised_publishing_interval as u64),
156 revised_lifetime_count,
157 revised_max_keep_alive_count,
158 request.priority,
159 self.limits.max_queued_notifications,
160 self.revise_max_notifications_per_publish(request.max_notifications_per_publish),
161 );
162 self.subscriptions.insert(subscription.id(), subscription);
163 Ok(CreateSubscriptionResponse {
164 response_header: ResponseHeader::new_good(&request.request_header),
165 subscription_id,
166 revised_publishing_interval,
167 revised_lifetime_count,
168 revised_max_keep_alive_count,
169 })
170 }
171
172 pub(super) fn modify_subscription(
173 &mut self,
174 request: &ModifySubscriptionRequest,
175 info: &ServerInfo,
176 ) -> Result<ModifySubscriptionResponse, StatusCode> {
177 let max_notifications_per_publish =
178 self.revise_max_notifications_per_publish(request.max_notifications_per_publish);
179 let Some(subscription) = self.subscriptions.get_mut(&request.subscription_id) else {
180 return Err(StatusCode::BadSubscriptionIdInvalid);
181 };
182
183 let (revised_publishing_interval, revised_max_keep_alive_count, revised_lifetime_count) =
184 Self::revise_subscription_values(
185 info,
186 request.requested_publishing_interval,
187 request.requested_max_keep_alive_count,
188 request.requested_lifetime_count,
189 );
190
191 subscription.set_publishing_interval(Duration::from_micros(
192 (revised_publishing_interval * 1000.0) as u64,
193 ));
194 subscription.set_max_keep_alive_counter(revised_max_keep_alive_count);
195 subscription.set_max_lifetime_counter(revised_lifetime_count);
196 subscription.set_priority(request.priority);
197 subscription.reset_lifetime_counter();
198 subscription.reset_keep_alive_counter();
199 subscription.set_max_notifications_per_publish(max_notifications_per_publish);
200
201 Ok(ModifySubscriptionResponse {
202 response_header: ResponseHeader::new_good(&request.request_header),
203 revised_publishing_interval,
204 revised_lifetime_count,
205 revised_max_keep_alive_count,
206 })
207 }
208
209 pub(super) fn set_publishing_mode(
210 &mut self,
211 request: &SetPublishingModeRequest,
212 ) -> Result<SetPublishingModeResponse, StatusCode> {
213 let Some(ids) = &request.subscription_ids else {
214 return Err(StatusCode::BadNothingToDo);
215 };
216 if ids.is_empty() {
217 return Err(StatusCode::BadNothingToDo);
218 }
219
220 let mut results = Vec::new();
221 for id in ids {
222 results.push(match self.subscriptions.get_mut(id) {
223 Some(sub) => {
224 sub.set_publishing_enabled(request.publishing_enabled);
225 sub.reset_lifetime_counter();
226 StatusCode::Good
227 }
228 None => StatusCode::BadSubscriptionIdInvalid,
229 })
230 }
231 Ok(SetPublishingModeResponse {
232 response_header: ResponseHeader::new_good(&request.request_header),
233 results: Some(results),
234 diagnostic_infos: None,
235 })
236 }
237
238 pub(super) fn republish(
239 &self,
240 request: &RepublishRequest,
241 ) -> Result<RepublishResponse, StatusCode> {
242 let msg = self.find_notification_message(
243 request.subscription_id,
244 request.retransmit_sequence_number,
245 )?;
246 Ok(RepublishResponse {
247 response_header: ResponseHeader::new_good(&request.request_header),
248 notification_message: msg,
249 })
250 }
251
252 pub(super) fn create_monitored_items(
253 &mut self,
254 subscription_id: u32,
255 requests: &[CreateMonitoredItem],
256 ) -> Result<Vec<MonitoredItemCreateResult>, StatusCode> {
257 let Some(sub) = self.subscriptions.get_mut(&subscription_id) else {
258 return Err(StatusCode::BadSubscriptionIdInvalid);
259 };
260
261 let mut results = Vec::with_capacity(requests.len());
262 for item in requests {
263 let filter_result = item
264 .filter_res()
265 .map(|r| ExtensionObject::from_message(r.clone()))
266 .unwrap_or_else(ExtensionObject::null);
267 if item.status_code().is_good() {
268 let new_item = MonitoredItem::new(item);
269 results.push(MonitoredItemCreateResult {
270 status_code: StatusCode::Good,
271 monitored_item_id: new_item.id(),
272 revised_sampling_interval: new_item.sampling_interval(),
273 revised_queue_size: new_item.queue_size() as u32,
274 filter_result,
275 });
276 sub.insert(new_item.id(), new_item);
277 } else {
278 results.push(MonitoredItemCreateResult {
279 status_code: item.status_code(),
280 monitored_item_id: 0,
281 revised_sampling_interval: item.sampling_interval(),
282 revised_queue_size: item.queue_size() as u32,
283 filter_result,
284 });
285 }
286 }
287
288 Ok(results)
289 }
290
291 pub(super) fn modify_monitored_items(
292 &mut self,
293 subscription_id: u32,
294 info: &ServerInfo,
295 timestamps_to_return: TimestampsToReturn,
296 requests: Vec<MonitoredItemModifyRequest>,
297 type_tree: &dyn TypeTree,
298 ) -> Result<Vec<MonitoredItemUpdateRef>, StatusCode> {
299 let Some(sub) = self.subscriptions.get_mut(&subscription_id) else {
300 return Err(StatusCode::BadSubscriptionIdInvalid);
301 };
302 let mut results = Vec::with_capacity(requests.len());
303 for request in requests {
304 if let Some(item) = sub.get_mut(&request.monitored_item_id) {
305 let (filter_result, status) =
306 item.modify(info, timestamps_to_return, &request, type_tree);
307 let filter_result = filter_result
308 .map(ExtensionObject::from_message)
309 .unwrap_or_else(ExtensionObject::null);
310
311 results.push(MonitoredItemUpdateRef::new(
312 MonitoredItemHandle {
313 subscription_id,
314 monitored_item_id: item.id(),
315 },
316 item.item_to_monitor().node_id.clone(),
317 item.item_to_monitor().attribute_id,
318 MonitoredItemModifyResult {
319 status_code: status,
320 revised_sampling_interval: item.sampling_interval(),
321 revised_queue_size: item.queue_size() as u32,
322 filter_result,
323 },
324 ));
325 } else {
326 results.push(MonitoredItemUpdateRef::new(
327 MonitoredItemHandle {
328 subscription_id,
329 monitored_item_id: request.monitored_item_id,
330 },
331 NodeId::null(),
332 AttributeId::NodeId,
333 MonitoredItemModifyResult {
334 status_code: StatusCode::BadMonitoredItemIdInvalid,
335 revised_sampling_interval: 0.0,
336 revised_queue_size: 0,
337 filter_result: ExtensionObject::null(),
338 },
339 ));
340 }
341 }
342
343 Ok(results)
344 }
345
346 pub(super) fn set_monitoring_mode(
347 &mut self,
348 subscription_id: u32,
349 monitoring_mode: MonitoringMode,
350 items: Vec<u32>,
351 ) -> Result<Vec<(StatusCode, MonitoredItemRef)>, StatusCode> {
352 let Some(sub) = self.subscriptions.get_mut(&subscription_id) else {
353 return Err(StatusCode::BadSubscriptionIdInvalid);
354 };
355 let mut results = Vec::with_capacity(items.len());
356 for id in items {
357 let handle = MonitoredItemHandle {
358 subscription_id,
359 monitored_item_id: id,
360 };
361 if let Some(item) = sub.get_mut(&id) {
362 results.push((
363 StatusCode::Good,
364 MonitoredItemRef::new(
365 handle,
366 item.item_to_monitor().node_id.clone(),
367 item.item_to_monitor().attribute_id,
368 ),
369 ));
370 item.set_monitoring_mode(monitoring_mode);
371 } else {
372 results.push((
373 StatusCode::BadMonitoredItemIdInvalid,
374 MonitoredItemRef::new(handle, NodeId::null(), AttributeId::NodeId),
375 ));
376 }
377 }
378 Ok(results)
379 }
380
381 fn filter_links(links: Vec<u32>, sub: &Subscription) -> (Vec<u32>, Vec<StatusCode>) {
382 let mut to_apply = Vec::with_capacity(links.len());
383 let mut results = Vec::with_capacity(links.len());
384
385 for link in links {
386 if sub.contains_key(&link) {
387 to_apply.push(link);
388 results.push(StatusCode::Good);
389 } else {
390 results.push(StatusCode::BadMonitoredItemIdInvalid);
391 }
392 }
393 (to_apply, results)
394 }
395
396 pub(super) fn set_triggering(
397 &mut self,
398 subscription_id: u32,
399 triggering_item_id: u32,
400 links_to_add: Vec<u32>,
401 links_to_remove: Vec<u32>,
402 ) -> Result<(Vec<StatusCode>, Vec<StatusCode>), StatusCode> {
403 let Some(sub) = self.subscriptions.get_mut(&subscription_id) else {
404 return Err(StatusCode::BadSubscriptionIdInvalid);
405 };
406 if !sub.contains_key(&triggering_item_id) {
407 return Err(StatusCode::BadMonitoredItemIdInvalid);
408 }
409
410 let (to_add, add_results) = Self::filter_links(links_to_add, sub);
411 let (to_remove, remove_results) = Self::filter_links(links_to_remove, sub);
412
413 let item = sub.get_mut(&triggering_item_id).unwrap();
414
415 item.set_triggering(&to_add, &to_remove);
416
417 Ok((add_results, remove_results))
418 }
419
420 pub(super) fn delete_monitored_items(
421 &mut self,
422 subscription_id: u32,
423 items: &[u32],
424 ) -> Result<Vec<(StatusCode, MonitoredItemRef)>, StatusCode> {
425 let Some(sub) = self.subscriptions.get_mut(&subscription_id) else {
426 return Err(StatusCode::BadSubscriptionIdInvalid);
427 };
428 let mut results = Vec::with_capacity(items.len());
429 for id in items {
430 let handle = MonitoredItemHandle {
431 subscription_id,
432 monitored_item_id: *id,
433 };
434 if let Some(item) = sub.remove(id) {
435 results.push((
436 StatusCode::Good,
437 MonitoredItemRef::new(
438 handle,
439 item.item_to_monitor().node_id.clone(),
440 item.item_to_monitor().attribute_id,
441 ),
442 ));
443 } else {
444 results.push((
445 StatusCode::BadMonitoredItemIdInvalid,
446 MonitoredItemRef::new(handle, NodeId::null(), AttributeId::NodeId),
447 ))
448 }
449 }
450 Ok(results)
451 }
452
453 pub(super) fn delete_subscriptions(
454 &mut self,
455 ids: &[u32],
456 ) -> Vec<(StatusCode, Vec<MonitoredItemRef>)> {
457 let id_set: HashSet<_> = ids.iter().copied().collect();
458 let mut result = Vec::with_capacity(ids.len());
459 for id in ids {
460 let Some(mut sub) = self.subscriptions.remove(id) else {
461 result.push((StatusCode::BadSubscriptionIdInvalid, Vec::new()));
462 continue;
463 };
464
465 let items = sub
466 .drain()
467 .map(|item| {
468 MonitoredItemRef::new(
469 MonitoredItemHandle {
470 subscription_id: *id,
471 monitored_item_id: item.1.id(),
472 },
473 item.1.item_to_monitor().node_id.clone(),
474 item.1.item_to_monitor().attribute_id,
475 )
476 })
477 .collect();
478
479 result.push((StatusCode::Good, items))
480 }
481
482 self.retransmission_queue
483 .retain(|r| !id_set.contains(&r.subscription_id));
484
485 result
486 }
487
488 fn revise_subscription_values(
491 info: &ServerInfo,
492 requested_publishing_interval: f64,
493 requested_max_keep_alive_count: u32,
494 requested_lifetime_count: u32,
495 ) -> (f64, u32, u32) {
496 let revised_publishing_interval = f64::max(
497 requested_publishing_interval,
498 info.config.limits.subscriptions.min_publishing_interval_ms,
499 );
500 let revised_max_keep_alive_count = if requested_max_keep_alive_count
501 > info.config.limits.subscriptions.max_keep_alive_count
502 {
503 info.config.limits.subscriptions.max_keep_alive_count
504 } else if requested_max_keep_alive_count == 0 {
505 info.config.limits.subscriptions.default_keep_alive_count
506 } else {
507 requested_max_keep_alive_count
508 };
509 let min_lifetime_count = revised_max_keep_alive_count * 3;
511 let revised_lifetime_count = if requested_lifetime_count < min_lifetime_count {
512 min_lifetime_count
513 } else if requested_lifetime_count > info.config.limits.subscriptions.max_lifetime_count {
514 info.config.limits.subscriptions.max_lifetime_count
515 } else {
516 requested_lifetime_count
517 };
518 (
519 revised_publishing_interval,
520 revised_max_keep_alive_count,
521 revised_lifetime_count,
522 )
523 }
524
525 fn revise_max_notifications_per_publish(&self, inp: u32) -> u64 {
526 if self.limits.max_notifications_per_publish == 0 {
527 inp as u64
528 } else if inp == 0 || inp as u64 > self.limits.max_notifications_per_publish {
529 self.limits.max_notifications_per_publish
530 } else {
531 inp as u64
532 }
533 }
534
535 pub(crate) fn enqueue_publish_request(
536 &mut self,
537 now: &DateTimeUtc,
538 now_instant: Instant,
539 mut request: PendingPublish,
540 ) {
541 if self.publish_request_queue.len() >= self.max_publish_requests() {
542 let _ = self.tick(now, now_instant, TickReason::ReceivePublishRequest);
544 }
545
546 if self.publish_request_queue.len() >= self.max_publish_requests() {
547 let req = self.publish_request_queue.pop_front().unwrap();
549 let _ = req.response.send(
552 ServiceFault::new(
553 &req.request.request_header,
554 StatusCode::BadTooManyPublishRequests,
555 )
556 .into(),
557 );
558 }
559
560 request.ack_results = self.process_subscription_acks(&request.request);
561 self.publish_request_queue.push_back(request);
562 self.tick(now, now_instant, TickReason::ReceivePublishRequest);
563 }
564
565 pub(crate) fn tick(
566 &mut self,
567 now: &DateTimeUtc,
568 now_instant: Instant,
569 tick_reason: TickReason,
570 ) -> Vec<MonitoredItemRef> {
571 let mut to_delete = Vec::new();
572 if self.subscriptions.is_empty() {
573 for pb in self.publish_request_queue.drain(..) {
574 let _ = pb.response.send(
575 ServiceFault::new(&pb.request.request_header, StatusCode::BadNoSubscription)
576 .into(),
577 );
578 }
579 return to_delete;
580 }
581
582 self.remove_expired_publish_requests(now_instant);
583
584 let subscription_ids = {
585 let mut subscription_priority: Vec<(u32, u8)> = self
587 .subscriptions
588 .values()
589 .map(|v| (v.id(), v.priority()))
590 .collect();
591 subscription_priority.sort_by(|s1, s2| s1.1.cmp(&s2.1));
592 subscription_priority.into_iter().map(|s| s.0)
593 };
594
595 let mut responses = Vec::new();
596 let mut more_notifications = false;
597
598 for sub_id in subscription_ids {
599 let subscription = self.subscriptions.get_mut(&sub_id).unwrap();
600 let res = subscription.tick(
601 now,
602 now_instant,
603 tick_reason,
604 !self.publish_request_queue.is_empty(),
605 );
606 while !self.publish_request_queue.is_empty() {
608 if let Some(notification_message) = subscription.take_notification() {
609 tracing::trace!("Sending notification message {:?}", notification_message);
610 let publish_request = self.publish_request_queue.pop_front().unwrap();
611 responses.push((publish_request, notification_message, sub_id));
612 } else {
613 break;
614 }
615 }
616 more_notifications |= subscription.more_notifications();
618
619 if matches!(res, TickResult::Expired) {
622 to_delete.extend(subscription.drain().map(|item| {
623 MonitoredItemRef::new(
624 MonitoredItemHandle {
625 subscription_id: sub_id,
626 monitored_item_id: item.1.id(),
627 },
628 item.1.item_to_monitor().node_id.clone(),
629 item.1.item_to_monitor().attribute_id,
630 )
631 }))
632 }
633
634 if subscription.ready_to_remove() {
635 self.subscriptions.remove(&sub_id);
636 self.retransmission_queue
637 .retain(|f| f.subscription_id != sub_id);
638 }
639 }
640
641 let num_responses = responses.len();
642 for (idx, (publish_request, notification, subscription_id)) in
643 responses.into_iter().enumerate()
644 {
645 let is_last = idx == num_responses - 1;
646
647 if self.retransmission_queue.len() >= self.max_publish_requests() * 2 {
648 self.retransmission_queue.pop_front();
649 }
650 self.retransmission_queue.push_back(NonAckedPublish {
651 message: notification.clone(),
652 subscription_id,
653 });
654
655 let available_sequence_numbers = self.available_sequence_numbers(subscription_id);
659
660 let _ = publish_request.response.send(
661 PublishResponse {
662 response_header: ResponseHeader::new_timestamped_service_result(
663 DateTime::from(*now),
664 &publish_request.request.request_header,
665 StatusCode::Good,
666 ),
667 subscription_id,
668 available_sequence_numbers,
669 more_notifications: is_last && more_notifications,
671 notification_message: notification,
672 results: publish_request.ack_results,
673 diagnostic_infos: None,
674 }
675 .into(),
676 );
677 }
678
679 to_delete
680 }
681
682 fn find_notification_message(
683 &self,
684 subscription_id: u32,
685 sequence_number: u32,
686 ) -> Result<NotificationMessage, StatusCode> {
687 if !self.subscriptions.contains_key(&subscription_id) {
688 return Err(StatusCode::BadSubscriptionIdInvalid);
689 }
690 let Some(notification) = self.retransmission_queue.iter().find(|m| {
691 m.subscription_id == subscription_id && m.message.sequence_number == sequence_number
692 }) else {
693 return Err(StatusCode::BadMessageNotAvailable);
694 };
695 Ok(notification.message.clone())
696 }
697
698 fn remove_expired_publish_requests(&mut self, now: Instant) {
699 let mut idx = 0;
700 while idx < self.publish_request_queue.len() {
701 if self.publish_request_queue[idx].deadline < now {
702 let req = self.publish_request_queue.remove(idx).unwrap();
703 let _ = req.response.send(
704 ServiceFault::new(&req.request.request_header, StatusCode::BadTimeout).into(),
705 );
706 } else {
707 idx += 1;
708 }
709 }
710 }
711
712 fn process_subscription_acks(&mut self, request: &PublishRequest) -> Option<Vec<StatusCode>> {
713 let acks = request.subscription_acknowledgements.as_ref()?;
714 if acks.is_empty() {
715 return None;
716 }
717
718 Some(
719 acks.iter()
720 .map(|ack| {
721 if !self.subscriptions.contains_key(&ack.subscription_id) {
722 StatusCode::BadSubscriptionIdInvalid
723 } else if let Some((idx, _)) =
724 self.retransmission_queue.iter().enumerate().find(|(_, p)| {
725 p.subscription_id == ack.subscription_id
726 && p.message.sequence_number == ack.sequence_number
727 })
728 {
729 self.retransmission_queue.remove(idx);
735 StatusCode::Good
736 } else {
737 StatusCode::BadSequenceNumberUnknown
738 }
739 })
740 .collect(),
741 )
742 }
743
744 pub(super) fn available_sequence_numbers(&self, subscription_id: u32) -> Option<Vec<u32>> {
746 if self.retransmission_queue.is_empty() {
747 return None;
748 }
749 let sequence_numbers: Vec<u32> = self
751 .retransmission_queue
752 .iter()
753 .filter(|&k| k.subscription_id == subscription_id)
754 .map(|k| k.message.sequence_number)
755 .collect();
756 if sequence_numbers.is_empty() {
757 None
758 } else {
759 Some(sequence_numbers)
760 }
761 }
762
763 pub(super) fn notify_data_changes(&mut self, values: Vec<(MonitoredItemHandle, DataValue)>) {
764 let now = DateTime::now();
765 for (handle, value) in values {
766 let Some(sub) = self.subscriptions.get_mut(&handle.subscription_id) else {
767 continue;
768 };
769 sub.notify_data_value(&handle.monitored_item_id, value, &now);
770 }
771 }
772
773 pub(super) fn notify_events(&mut self, events: Vec<(MonitoredItemHandle, &dyn Event)>) {
774 let mut lck = None;
776 for (handle, event) in events {
777 let Some(sub) = self.subscriptions.get_mut(&handle.subscription_id) else {
778 continue;
779 };
780 let type_tree = lck.get_or_insert_with(|| self.type_tree_for_user.get_type_tree());
781 sub.notify_event(&handle.monitored_item_id, event, type_tree.get());
782 }
783 }
784
785 pub(super) fn user_token(&self) -> &PersistentSessionKey {
786 &self.user_token
787 }
788
789 pub(super) fn get_monitored_item_count(&self, subscription_id: u32) -> Option<usize> {
790 self.subscriptions.get(&subscription_id).map(|s| s.len())
791 }
792
793 pub fn session(&self) -> &Arc<RwLock<Session>> {
795 &self.session
796 }
797}