Skip to main content

opcua_server/subscriptions/
session_subscriptions.rs

1use std::{
2    collections::VecDeque,
3    sync::Arc,
4    time::{Duration, Instant},
5};
6
7use super::{
8    monitored_item::MonitoredItem,
9    subscription::{MonitoredItemHandle, Subscription, TickReason, TickResult},
10    CreateMonitoredItem, NonAckedPublish, PendingPublish, PersistentSessionKey,
11};
12use hashbrown::{HashMap, HashSet};
13use opcua_nodes::{Event, TypeTree};
14
15use crate::{
16    info::ServerInfo,
17    node_manager::{MonitoredItemRef, MonitoredItemUpdateRef, TypeTreeForUserStatic},
18    session::instance::Session,
19    SubscriptionLimits,
20};
21use opcua_core::sync::RwLock;
22use opcua_types::{
23    AttributeId, CreateSubscriptionRequest, CreateSubscriptionResponse, DataValue, DateTime,
24    DateTimeUtc, ExtensionObject, ModifySubscriptionRequest, ModifySubscriptionResponse,
25    MonitoredItemCreateResult, MonitoredItemModifyRequest, MonitoredItemModifyResult,
26    MonitoringMode, NodeId, NotificationMessage, PublishRequest, PublishResponse, RepublishRequest,
27    RepublishResponse, ResponseHeader, ServiceFault, SetPublishingModeRequest,
28    SetPublishingModeResponse, StatusCode, TimestampsToReturn,
29};
30
31/// Subscriptions belonging to a single session. Note that they are technically _owned_ by
32/// a user token, which means that they can be transfered to a different session.
33pub struct SessionSubscriptions {
34    /// Identity token of the user that created the subscription, used for transfer subscriptions.
35    user_token: PersistentSessionKey,
36    /// Subscriptions associated with the session.
37    subscriptions: HashMap<u32, Subscription>,
38    /// Publish request queue (requests by the client on the session)
39    publish_request_queue: VecDeque<PendingPublish>,
40    /// Notifications that have been sent but have yet to be acknowledged (retransmission queue).
41    retransmission_queue: VecDeque<NonAckedPublish>,
42    /// Configured limits on subscriptions.
43    limits: SubscriptionLimits,
44
45    /// Static reference to the session owning this, required to cleanly handle deletion.
46    session: Arc<RwLock<Session>>,
47    /// Static reference to the type-tree for the user owning this.
48    type_tree_for_user: Arc<dyn TypeTreeForUserStatic>,
49}
50
51impl SessionSubscriptions {
52    pub(super) fn new(
53        limits: SubscriptionLimits,
54        user_token: PersistentSessionKey,
55        session: Arc<RwLock<Session>>,
56        type_tree_for_user: Arc<dyn TypeTreeForUserStatic>,
57    ) -> Self {
58        Self {
59            user_token,
60            subscriptions: HashMap::new(),
61            publish_request_queue: VecDeque::new(),
62            retransmission_queue: VecDeque::new(),
63            limits,
64            session,
65            type_tree_for_user,
66        }
67    }
68
69    fn max_publish_requests(&self) -> usize {
70        self.limits
71            .max_pending_publish_requests
72            .min(self.subscriptions.len() * self.limits.max_publish_requests_per_subscription)
73            .max(1)
74    }
75
76    pub(super) fn is_ready_to_delete(&self) -> bool {
77        self.subscriptions.is_empty() && self.publish_request_queue.is_empty()
78    }
79
80    #[allow(clippy::result_large_err)]
81    pub(super) fn insert(
82        &mut self,
83        subscription: Subscription,
84        notifs: Vec<NonAckedPublish>,
85    ) -> Result<(), (StatusCode, Subscription, Vec<NonAckedPublish>)> {
86        if self.subscriptions.len() >= self.limits.max_subscriptions_per_session {
87            return Err((StatusCode::BadTooManySubscriptions, subscription, notifs));
88        }
89        self.subscriptions.insert(subscription.id(), subscription);
90        for notif in notifs {
91            self.retransmission_queue.push_back(notif);
92        }
93        Ok(())
94    }
95
96    /// Return `true` if the session has a subscription with ID given by
97    /// `sub_id`.
98    pub fn contains(&self, sub_id: u32) -> bool {
99        self.subscriptions.contains_key(&sub_id)
100    }
101
102    /// Return a vector of all the subscription IDs in this session.
103    pub fn subscription_ids(&self) -> Vec<u32> {
104        self.subscriptions.keys().copied().collect()
105    }
106
107    pub(super) fn remove(
108        &mut self,
109        subscription_id: u32,
110    ) -> (Option<Subscription>, Vec<NonAckedPublish>) {
111        let mut notifs = Vec::new();
112        let mut idx = 0;
113        while idx < self.retransmission_queue.len() {
114            if self.retransmission_queue[idx].subscription_id == subscription_id {
115                notifs.push(self.retransmission_queue.remove(idx).unwrap());
116            } else {
117                idx += 1;
118            }
119        }
120
121        (self.subscriptions.remove(&subscription_id), notifs)
122    }
123
124    /// Get a mutable reference to a subscription by ID.
125    pub fn get_mut(&mut self, subscription_id: u32) -> Option<&mut Subscription> {
126        self.subscriptions.get_mut(&subscription_id)
127    }
128
129    /// Get a reference to a subscription by ID.
130    pub fn get(&self, subscription_id: u32) -> Option<&Subscription> {
131        self.subscriptions.get(&subscription_id)
132    }
133
134    pub(super) fn create_subscription(
135        &mut self,
136        request: &CreateSubscriptionRequest,
137        info: &ServerInfo,
138    ) -> Result<CreateSubscriptionResponse, StatusCode> {
139        if self.subscriptions.len() >= self.limits.max_subscriptions_per_session {
140            return Err(StatusCode::BadTooManySubscriptions);
141        }
142        let subscription_id = info.subscription_id_handle.next();
143
144        let (revised_publishing_interval, revised_max_keep_alive_count, revised_lifetime_count) =
145            Self::revise_subscription_values(
146                info,
147                request.requested_publishing_interval,
148                request.requested_max_keep_alive_count,
149                request.requested_lifetime_count,
150            );
151
152        let subscription = Subscription::new(
153            subscription_id,
154            request.publishing_enabled,
155            Duration::from_millis(revised_publishing_interval as u64),
156            revised_lifetime_count,
157            revised_max_keep_alive_count,
158            request.priority,
159            self.limits.max_queued_notifications,
160            self.revise_max_notifications_per_publish(request.max_notifications_per_publish),
161        );
162        self.subscriptions.insert(subscription.id(), subscription);
163        Ok(CreateSubscriptionResponse {
164            response_header: ResponseHeader::new_good(&request.request_header),
165            subscription_id,
166            revised_publishing_interval,
167            revised_lifetime_count,
168            revised_max_keep_alive_count,
169        })
170    }
171
172    pub(super) fn modify_subscription(
173        &mut self,
174        request: &ModifySubscriptionRequest,
175        info: &ServerInfo,
176    ) -> Result<ModifySubscriptionResponse, StatusCode> {
177        let max_notifications_per_publish =
178            self.revise_max_notifications_per_publish(request.max_notifications_per_publish);
179        let Some(subscription) = self.subscriptions.get_mut(&request.subscription_id) else {
180            return Err(StatusCode::BadSubscriptionIdInvalid);
181        };
182
183        let (revised_publishing_interval, revised_max_keep_alive_count, revised_lifetime_count) =
184            Self::revise_subscription_values(
185                info,
186                request.requested_publishing_interval,
187                request.requested_max_keep_alive_count,
188                request.requested_lifetime_count,
189            );
190
191        subscription.set_publishing_interval(Duration::from_micros(
192            (revised_publishing_interval * 1000.0) as u64,
193        ));
194        subscription.set_max_keep_alive_counter(revised_max_keep_alive_count);
195        subscription.set_max_lifetime_counter(revised_lifetime_count);
196        subscription.set_priority(request.priority);
197        subscription.reset_lifetime_counter();
198        subscription.reset_keep_alive_counter();
199        subscription.set_max_notifications_per_publish(max_notifications_per_publish);
200
201        Ok(ModifySubscriptionResponse {
202            response_header: ResponseHeader::new_good(&request.request_header),
203            revised_publishing_interval,
204            revised_lifetime_count,
205            revised_max_keep_alive_count,
206        })
207    }
208
209    pub(super) fn set_publishing_mode(
210        &mut self,
211        request: &SetPublishingModeRequest,
212    ) -> Result<SetPublishingModeResponse, StatusCode> {
213        let Some(ids) = &request.subscription_ids else {
214            return Err(StatusCode::BadNothingToDo);
215        };
216        if ids.is_empty() {
217            return Err(StatusCode::BadNothingToDo);
218        }
219
220        let mut results = Vec::new();
221        for id in ids {
222            results.push(match self.subscriptions.get_mut(id) {
223                Some(sub) => {
224                    sub.set_publishing_enabled(request.publishing_enabled);
225                    sub.reset_lifetime_counter();
226                    StatusCode::Good
227                }
228                None => StatusCode::BadSubscriptionIdInvalid,
229            })
230        }
231        Ok(SetPublishingModeResponse {
232            response_header: ResponseHeader::new_good(&request.request_header),
233            results: Some(results),
234            diagnostic_infos: None,
235        })
236    }
237
238    pub(super) fn republish(
239        &self,
240        request: &RepublishRequest,
241    ) -> Result<RepublishResponse, StatusCode> {
242        let msg = self.find_notification_message(
243            request.subscription_id,
244            request.retransmit_sequence_number,
245        )?;
246        Ok(RepublishResponse {
247            response_header: ResponseHeader::new_good(&request.request_header),
248            notification_message: msg,
249        })
250    }
251
252    pub(super) fn create_monitored_items(
253        &mut self,
254        subscription_id: u32,
255        requests: &[CreateMonitoredItem],
256    ) -> Result<Vec<MonitoredItemCreateResult>, StatusCode> {
257        let Some(sub) = self.subscriptions.get_mut(&subscription_id) else {
258            return Err(StatusCode::BadSubscriptionIdInvalid);
259        };
260
261        let mut results = Vec::with_capacity(requests.len());
262        for item in requests {
263            let filter_result = item
264                .filter_res()
265                .map(|r| ExtensionObject::from_message(r.clone()))
266                .unwrap_or_else(ExtensionObject::null);
267            if item.status_code().is_good() {
268                let new_item = MonitoredItem::new(item);
269                results.push(MonitoredItemCreateResult {
270                    status_code: StatusCode::Good,
271                    monitored_item_id: new_item.id(),
272                    revised_sampling_interval: new_item.sampling_interval(),
273                    revised_queue_size: new_item.queue_size() as u32,
274                    filter_result,
275                });
276                sub.insert(new_item.id(), new_item);
277            } else {
278                results.push(MonitoredItemCreateResult {
279                    status_code: item.status_code(),
280                    monitored_item_id: 0,
281                    revised_sampling_interval: item.sampling_interval(),
282                    revised_queue_size: item.queue_size() as u32,
283                    filter_result,
284                });
285            }
286        }
287
288        Ok(results)
289    }
290
291    pub(super) fn modify_monitored_items(
292        &mut self,
293        subscription_id: u32,
294        info: &ServerInfo,
295        timestamps_to_return: TimestampsToReturn,
296        requests: Vec<MonitoredItemModifyRequest>,
297        type_tree: &dyn TypeTree,
298    ) -> Result<Vec<MonitoredItemUpdateRef>, StatusCode> {
299        let Some(sub) = self.subscriptions.get_mut(&subscription_id) else {
300            return Err(StatusCode::BadSubscriptionIdInvalid);
301        };
302        let mut results = Vec::with_capacity(requests.len());
303        for request in requests {
304            if let Some(item) = sub.get_mut(&request.monitored_item_id) {
305                let (filter_result, status) =
306                    item.modify(info, timestamps_to_return, &request, type_tree);
307                let filter_result = filter_result
308                    .map(ExtensionObject::from_message)
309                    .unwrap_or_else(ExtensionObject::null);
310
311                results.push(MonitoredItemUpdateRef::new(
312                    MonitoredItemHandle {
313                        subscription_id,
314                        monitored_item_id: item.id(),
315                    },
316                    item.item_to_monitor().node_id.clone(),
317                    item.item_to_monitor().attribute_id,
318                    MonitoredItemModifyResult {
319                        status_code: status,
320                        revised_sampling_interval: item.sampling_interval(),
321                        revised_queue_size: item.queue_size() as u32,
322                        filter_result,
323                    },
324                ));
325            } else {
326                results.push(MonitoredItemUpdateRef::new(
327                    MonitoredItemHandle {
328                        subscription_id,
329                        monitored_item_id: request.monitored_item_id,
330                    },
331                    NodeId::null(),
332                    AttributeId::NodeId,
333                    MonitoredItemModifyResult {
334                        status_code: StatusCode::BadMonitoredItemIdInvalid,
335                        revised_sampling_interval: 0.0,
336                        revised_queue_size: 0,
337                        filter_result: ExtensionObject::null(),
338                    },
339                ));
340            }
341        }
342
343        Ok(results)
344    }
345
346    pub(super) fn set_monitoring_mode(
347        &mut self,
348        subscription_id: u32,
349        monitoring_mode: MonitoringMode,
350        items: Vec<u32>,
351    ) -> Result<Vec<(StatusCode, MonitoredItemRef)>, StatusCode> {
352        let Some(sub) = self.subscriptions.get_mut(&subscription_id) else {
353            return Err(StatusCode::BadSubscriptionIdInvalid);
354        };
355        let mut results = Vec::with_capacity(items.len());
356        for id in items {
357            let handle = MonitoredItemHandle {
358                subscription_id,
359                monitored_item_id: id,
360            };
361            if let Some(item) = sub.get_mut(&id) {
362                results.push((
363                    StatusCode::Good,
364                    MonitoredItemRef::new(
365                        handle,
366                        item.item_to_monitor().node_id.clone(),
367                        item.item_to_monitor().attribute_id,
368                    ),
369                ));
370                item.set_monitoring_mode(monitoring_mode);
371            } else {
372                results.push((
373                    StatusCode::BadMonitoredItemIdInvalid,
374                    MonitoredItemRef::new(handle, NodeId::null(), AttributeId::NodeId),
375                ));
376            }
377        }
378        Ok(results)
379    }
380
381    fn filter_links(links: Vec<u32>, sub: &Subscription) -> (Vec<u32>, Vec<StatusCode>) {
382        let mut to_apply = Vec::with_capacity(links.len());
383        let mut results = Vec::with_capacity(links.len());
384
385        for link in links {
386            if sub.contains_key(&link) {
387                to_apply.push(link);
388                results.push(StatusCode::Good);
389            } else {
390                results.push(StatusCode::BadMonitoredItemIdInvalid);
391            }
392        }
393        (to_apply, results)
394    }
395
396    pub(super) fn set_triggering(
397        &mut self,
398        subscription_id: u32,
399        triggering_item_id: u32,
400        links_to_add: Vec<u32>,
401        links_to_remove: Vec<u32>,
402    ) -> Result<(Vec<StatusCode>, Vec<StatusCode>), StatusCode> {
403        let Some(sub) = self.subscriptions.get_mut(&subscription_id) else {
404            return Err(StatusCode::BadSubscriptionIdInvalid);
405        };
406        if !sub.contains_key(&triggering_item_id) {
407            return Err(StatusCode::BadMonitoredItemIdInvalid);
408        }
409
410        let (to_add, add_results) = Self::filter_links(links_to_add, sub);
411        let (to_remove, remove_results) = Self::filter_links(links_to_remove, sub);
412
413        let item = sub.get_mut(&triggering_item_id).unwrap();
414
415        item.set_triggering(&to_add, &to_remove);
416
417        Ok((add_results, remove_results))
418    }
419
420    pub(super) fn delete_monitored_items(
421        &mut self,
422        subscription_id: u32,
423        items: &[u32],
424    ) -> Result<Vec<(StatusCode, MonitoredItemRef)>, StatusCode> {
425        let Some(sub) = self.subscriptions.get_mut(&subscription_id) else {
426            return Err(StatusCode::BadSubscriptionIdInvalid);
427        };
428        let mut results = Vec::with_capacity(items.len());
429        for id in items {
430            let handle = MonitoredItemHandle {
431                subscription_id,
432                monitored_item_id: *id,
433            };
434            if let Some(item) = sub.remove(id) {
435                results.push((
436                    StatusCode::Good,
437                    MonitoredItemRef::new(
438                        handle,
439                        item.item_to_monitor().node_id.clone(),
440                        item.item_to_monitor().attribute_id,
441                    ),
442                ));
443            } else {
444                results.push((
445                    StatusCode::BadMonitoredItemIdInvalid,
446                    MonitoredItemRef::new(handle, NodeId::null(), AttributeId::NodeId),
447                ))
448            }
449        }
450        Ok(results)
451    }
452
453    pub(super) fn delete_subscriptions(
454        &mut self,
455        ids: &[u32],
456    ) -> Vec<(StatusCode, Vec<MonitoredItemRef>)> {
457        let id_set: HashSet<_> = ids.iter().copied().collect();
458        let mut result = Vec::with_capacity(ids.len());
459        for id in ids {
460            let Some(mut sub) = self.subscriptions.remove(id) else {
461                result.push((StatusCode::BadSubscriptionIdInvalid, Vec::new()));
462                continue;
463            };
464
465            let items = sub
466                .drain()
467                .map(|item| {
468                    MonitoredItemRef::new(
469                        MonitoredItemHandle {
470                            subscription_id: *id,
471                            monitored_item_id: item.1.id(),
472                        },
473                        item.1.item_to_monitor().node_id.clone(),
474                        item.1.item_to_monitor().attribute_id,
475                    )
476                })
477                .collect();
478
479            result.push((StatusCode::Good, items))
480        }
481
482        self.retransmission_queue
483            .retain(|r| !id_set.contains(&r.subscription_id));
484
485        result
486    }
487
488    /// This function takes the requested values passed in a create / modify and returns revised
489    /// values that conform to the server's limits. For simplicity the return type is a tuple
490    fn revise_subscription_values(
491        info: &ServerInfo,
492        requested_publishing_interval: f64,
493        requested_max_keep_alive_count: u32,
494        requested_lifetime_count: u32,
495    ) -> (f64, u32, u32) {
496        let revised_publishing_interval = f64::max(
497            requested_publishing_interval,
498            info.config.limits.subscriptions.min_publishing_interval_ms,
499        );
500        let revised_max_keep_alive_count = if requested_max_keep_alive_count
501            > info.config.limits.subscriptions.max_keep_alive_count
502        {
503            info.config.limits.subscriptions.max_keep_alive_count
504        } else if requested_max_keep_alive_count == 0 {
505            info.config.limits.subscriptions.default_keep_alive_count
506        } else {
507            requested_max_keep_alive_count
508        };
509        // Lifetime count must exceed keep alive count by at least a multiple of
510        let min_lifetime_count = revised_max_keep_alive_count * 3;
511        let revised_lifetime_count = if requested_lifetime_count < min_lifetime_count {
512            min_lifetime_count
513        } else if requested_lifetime_count > info.config.limits.subscriptions.max_lifetime_count {
514            info.config.limits.subscriptions.max_lifetime_count
515        } else {
516            requested_lifetime_count
517        };
518        (
519            revised_publishing_interval,
520            revised_max_keep_alive_count,
521            revised_lifetime_count,
522        )
523    }
524
525    fn revise_max_notifications_per_publish(&self, inp: u32) -> u64 {
526        if self.limits.max_notifications_per_publish == 0 {
527            inp as u64
528        } else if inp == 0 || inp as u64 > self.limits.max_notifications_per_publish {
529            self.limits.max_notifications_per_publish
530        } else {
531            inp as u64
532        }
533    }
534
535    pub(crate) fn enqueue_publish_request(
536        &mut self,
537        now: &DateTimeUtc,
538        now_instant: Instant,
539        mut request: PendingPublish,
540    ) {
541        if self.publish_request_queue.len() >= self.max_publish_requests() {
542            // Tick to trigger publish, maybe remove a request to make space for new one
543            let _ = self.tick(now, now_instant, TickReason::ReceivePublishRequest);
544        }
545
546        if self.publish_request_queue.len() >= self.max_publish_requests() {
547            // Pop the oldest publish request from the queue and return it with an error
548            let req = self.publish_request_queue.pop_front().unwrap();
549            // Ignore the result of this, if it fails it just means that the
550            // channel is disconnected.
551            let _ = req.response.send(
552                ServiceFault::new(
553                    &req.request.request_header,
554                    StatusCode::BadTooManyPublishRequests,
555                )
556                .into(),
557            );
558        }
559
560        request.ack_results = self.process_subscription_acks(&request.request);
561        self.publish_request_queue.push_back(request);
562        self.tick(now, now_instant, TickReason::ReceivePublishRequest);
563    }
564
565    pub(crate) fn tick(
566        &mut self,
567        now: &DateTimeUtc,
568        now_instant: Instant,
569        tick_reason: TickReason,
570    ) -> Vec<MonitoredItemRef> {
571        let mut to_delete = Vec::new();
572        if self.subscriptions.is_empty() {
573            for pb in self.publish_request_queue.drain(..) {
574                let _ = pb.response.send(
575                    ServiceFault::new(&pb.request.request_header, StatusCode::BadNoSubscription)
576                        .into(),
577                );
578            }
579            return to_delete;
580        }
581
582        self.remove_expired_publish_requests(now_instant);
583
584        let subscription_ids = {
585            // Sort subscriptions by priority
586            let mut subscription_priority: Vec<(u32, u8)> = self
587                .subscriptions
588                .values()
589                .map(|v| (v.id(), v.priority()))
590                .collect();
591            subscription_priority.sort_by(|s1, s2| s1.1.cmp(&s2.1));
592            subscription_priority.into_iter().map(|s| s.0)
593        };
594
595        let mut responses = Vec::new();
596        let mut more_notifications = false;
597
598        for sub_id in subscription_ids {
599            let subscription = self.subscriptions.get_mut(&sub_id).unwrap();
600            let res = subscription.tick(
601                now,
602                now_instant,
603                tick_reason,
604                !self.publish_request_queue.is_empty(),
605            );
606            // Get notifications and publish request pairs while there are any of either left.
607            while !self.publish_request_queue.is_empty() {
608                if let Some(notification_message) = subscription.take_notification() {
609                    tracing::trace!("Sending notification message {:?}", notification_message);
610                    let publish_request = self.publish_request_queue.pop_front().unwrap();
611                    responses.push((publish_request, notification_message, sub_id));
612                } else {
613                    break;
614                }
615            }
616            // Make sure to note if there are more notifications in any subscription.
617            more_notifications |= subscription.more_notifications();
618
619            // If the subscription expired, make sure to collect any deleted monitored items.
620
621            if matches!(res, TickResult::Expired) {
622                to_delete.extend(subscription.drain().map(|item| {
623                    MonitoredItemRef::new(
624                        MonitoredItemHandle {
625                            subscription_id: sub_id,
626                            monitored_item_id: item.1.id(),
627                        },
628                        item.1.item_to_monitor().node_id.clone(),
629                        item.1.item_to_monitor().attribute_id,
630                    )
631                }))
632            }
633
634            if subscription.ready_to_remove() {
635                self.subscriptions.remove(&sub_id);
636                self.retransmission_queue
637                    .retain(|f| f.subscription_id != sub_id);
638            }
639        }
640
641        let num_responses = responses.len();
642        for (idx, (publish_request, notification, subscription_id)) in
643            responses.into_iter().enumerate()
644        {
645            let is_last = idx == num_responses - 1;
646
647            if self.retransmission_queue.len() >= self.max_publish_requests() * 2 {
648                self.retransmission_queue.pop_front();
649            }
650            self.retransmission_queue.push_back(NonAckedPublish {
651                message: notification.clone(),
652                subscription_id,
653            });
654
655            // Take note of the available sequence numbers after we have added the NonAckedPublish
656            // to the list. This makes sure that the available sequence numbers list is not empty and contains
657            // the NonAckedPublish we just added.
658            let available_sequence_numbers = self.available_sequence_numbers(subscription_id);
659
660            let _ = publish_request.response.send(
661                PublishResponse {
662                    response_header: ResponseHeader::new_timestamped_service_result(
663                        DateTime::from(*now),
664                        &publish_request.request.request_header,
665                        StatusCode::Good,
666                    ),
667                    subscription_id,
668                    available_sequence_numbers,
669                    // Only set more_notifications on the last publish response.
670                    more_notifications: is_last && more_notifications,
671                    notification_message: notification,
672                    results: publish_request.ack_results,
673                    diagnostic_infos: None,
674                }
675                .into(),
676            );
677        }
678
679        to_delete
680    }
681
682    fn find_notification_message(
683        &self,
684        subscription_id: u32,
685        sequence_number: u32,
686    ) -> Result<NotificationMessage, StatusCode> {
687        if !self.subscriptions.contains_key(&subscription_id) {
688            return Err(StatusCode::BadSubscriptionIdInvalid);
689        }
690        let Some(notification) = self.retransmission_queue.iter().find(|m| {
691            m.subscription_id == subscription_id && m.message.sequence_number == sequence_number
692        }) else {
693            return Err(StatusCode::BadMessageNotAvailable);
694        };
695        Ok(notification.message.clone())
696    }
697
698    fn remove_expired_publish_requests(&mut self, now: Instant) {
699        let mut idx = 0;
700        while idx < self.publish_request_queue.len() {
701            if self.publish_request_queue[idx].deadline < now {
702                let req = self.publish_request_queue.remove(idx).unwrap();
703                let _ = req.response.send(
704                    ServiceFault::new(&req.request.request_header, StatusCode::BadTimeout).into(),
705                );
706            } else {
707                idx += 1;
708            }
709        }
710    }
711
712    fn process_subscription_acks(&mut self, request: &PublishRequest) -> Option<Vec<StatusCode>> {
713        let acks = request.subscription_acknowledgements.as_ref()?;
714        if acks.is_empty() {
715            return None;
716        }
717
718        Some(
719            acks.iter()
720                .map(|ack| {
721                    if !self.subscriptions.contains_key(&ack.subscription_id) {
722                        StatusCode::BadSubscriptionIdInvalid
723                    } else if let Some((idx, _)) =
724                        self.retransmission_queue.iter().enumerate().find(|(_, p)| {
725                            p.subscription_id == ack.subscription_id
726                                && p.message.sequence_number == ack.sequence_number
727                        })
728                    {
729                        // This is potentially innefficient, but this is probably fine due to two factors:
730                        //  - we need unordered removal, _and_ ordered removal, which means we need to deal
731                        //    with this anyway.
732                        //  - The queue is likely to be short, and the element to be removed is likely to be the
733                        //    first.
734                        self.retransmission_queue.remove(idx);
735                        StatusCode::Good
736                    } else {
737                        StatusCode::BadSequenceNumberUnknown
738                    }
739                })
740                .collect(),
741        )
742    }
743
744    /// Returns the array of available sequence numbers in the retransmission queue for the specified subscription
745    pub(super) fn available_sequence_numbers(&self, subscription_id: u32) -> Option<Vec<u32>> {
746        if self.retransmission_queue.is_empty() {
747            return None;
748        }
749        // Find the notifications matching this subscription id in the retransmission queue
750        let sequence_numbers: Vec<u32> = self
751            .retransmission_queue
752            .iter()
753            .filter(|&k| k.subscription_id == subscription_id)
754            .map(|k| k.message.sequence_number)
755            .collect();
756        if sequence_numbers.is_empty() {
757            None
758        } else {
759            Some(sequence_numbers)
760        }
761    }
762
763    pub(super) fn notify_data_changes(&mut self, values: Vec<(MonitoredItemHandle, DataValue)>) {
764        let now = DateTime::now();
765        for (handle, value) in values {
766            let Some(sub) = self.subscriptions.get_mut(&handle.subscription_id) else {
767                continue;
768            };
769            sub.notify_data_value(&handle.monitored_item_id, value, &now);
770        }
771    }
772
773    pub(super) fn notify_events(&mut self, events: Vec<(MonitoredItemHandle, &dyn Event)>) {
774        // Only get the inner type tree if we need to, for performance.
775        let mut lck = None;
776        for (handle, event) in events {
777            let Some(sub) = self.subscriptions.get_mut(&handle.subscription_id) else {
778                continue;
779            };
780            let type_tree = lck.get_or_insert_with(|| self.type_tree_for_user.get_type_tree());
781            sub.notify_event(&handle.monitored_item_id, event, type_tree.get());
782        }
783    }
784
785    pub(super) fn user_token(&self) -> &PersistentSessionKey {
786        &self.user_token
787    }
788
789    pub(super) fn get_monitored_item_count(&self, subscription_id: u32) -> Option<usize> {
790        self.subscriptions.get(&subscription_id).map(|s| s.len())
791    }
792
793    /// Get a reference to the session this subscription collection is owned by.
794    pub fn session(&self) -> &Arc<RwLock<Session>> {
795        &self.session
796    }
797}