Skip to main content

opcua_server/subscriptions/
mod.rs

1mod monitored_item;
2mod notify;
3mod session_subscriptions;
4mod subscription;
5
6use std::{hash::Hash, sync::Arc, time::Instant};
7
8use chrono::Utc;
9use hashbrown::{Equivalent, HashMap};
10pub use monitored_item::{CreateMonitoredItem, MonitoredItem};
11use opcua_core::{trace_read_lock, trace_write_lock, ResponseMessage};
12use opcua_nodes::{Event, TypeTree};
13pub use session_subscriptions::SessionSubscriptions;
14use subscription::TickReason;
15pub use subscription::{MonitoredItemHandle, Subscription, SubscriptionState};
16use tracing::error;
17
18pub use notify::{
19    SubscriptionDataNotifier, SubscriptionDataNotifierBatch, SubscriptionEventNotifier,
20    SubscriptionEventNotifierBatch,
21};
22
23use opcua_core::sync::{Mutex, RwLock};
24
25use opcua_types::{
26    node_id::{IdentifierRef, NodeIdRef},
27    AttributeId, CreateSubscriptionRequest, CreateSubscriptionResponse, DataEncoding, DataValue,
28    DateTimeUtc, MessageSecurityMode, ModifySubscriptionRequest, ModifySubscriptionResponse,
29    MonitoredItemCreateResult, MonitoredItemModifyRequest, MonitoringMode, NodeId,
30    NotificationMessage, NumericRange, PublishRequest, RepublishRequest, RepublishResponse,
31    ResponseHeader, SetPublishingModeRequest, SetPublishingModeResponse, StatusCode,
32    TimestampsToReturn, TransferResult, TransferSubscriptionsRequest,
33    TransferSubscriptionsResponse,
34};
35
36use crate::node_manager::RequestContextInner;
37
38use super::{
39    authenticator::UserToken,
40    info::ServerInfo,
41    node_manager::{MonitoredItemRef, MonitoredItemUpdateRef, RequestContext, ServerContext},
42    session::instance::Session,
43    SubscriptionLimits,
44};
45
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47struct MonitoredItemKey {
48    id: NodeId,
49    attribute_id: AttributeId,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53struct MonitoredItemKeyRef<T: IdentifierRef> {
54    id: NodeIdRef<T>,
55    attribute_id: AttributeId,
56}
57
58impl<T> Hash for MonitoredItemKeyRef<T>
59where
60    T: IdentifierRef,
61{
62    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
63        self.id.hash(state);
64        self.attribute_id.hash(state);
65    }
66}
67
68impl<T: IdentifierRef> Equivalent<MonitoredItemKey> for MonitoredItemKeyRef<T> {
69    fn equivalent(&self, key: &MonitoredItemKey) -> bool {
70        self.id == key.id && self.attribute_id == key.attribute_id
71    }
72}
73
74/// A basic description of the monitoring parameters for a monitored item, used
75/// for conditional sampling.
76pub struct MonitoredItemEntry {
77    /// Whether the monitored item is currently active.
78    pub enabled: bool,
79    /// The data encoding of the monitored item.
80    pub data_encoding: DataEncoding,
81    /// The index range of the monitored item.
82    pub index_range: NumericRange,
83}
84
85struct SubscriptionCacheInner {
86    /// Map from session ID to subscription cache
87    session_subscriptions: HashMap<u32, Arc<Mutex<SessionSubscriptions>>>,
88    /// Map from subscription ID to session ID.
89    subscription_to_session: HashMap<u32, u32>,
90    /// Map from notifier node ID to monitored item handles.
91    monitored_items: HashMap<MonitoredItemKey, HashMap<MonitoredItemHandle, MonitoredItemEntry>>,
92}
93
94/// Structure storing all subscriptions and monitored items on the server.
95/// Used to notify users of changes.
96///
97/// Subscriptions can outlive sessions, and sessions can outlive connections,
98/// so neither can be owned by the connection. This provides convenient methods for
99/// manipulating subscriptions.
100pub struct SubscriptionCache {
101    inner: RwLock<SubscriptionCacheInner>,
102    /// Configured limits on subscriptions.
103    limits: SubscriptionLimits,
104}
105
106impl SubscriptionCache {
107    pub(crate) fn new(limits: SubscriptionLimits) -> Self {
108        Self {
109            inner: RwLock::new(SubscriptionCacheInner {
110                session_subscriptions: HashMap::new(),
111                subscription_to_session: HashMap::new(),
112                monitored_items: HashMap::new(),
113            }),
114            limits,
115        }
116    }
117
118    /// Get the `SessionSubscriptions` object for a single session by its numeric ID.
119    pub fn get_session_subscriptions(
120        &self,
121        session_id: u32,
122    ) -> Option<Arc<Mutex<SessionSubscriptions>>> {
123        let inner = trace_read_lock!(self.inner);
124        inner.session_subscriptions.get(&session_id).cloned()
125    }
126
127    /// This is the periodic subscription tick where we check for
128    /// triggered subscriptions.
129    ///
130    pub(crate) async fn periodic_tick(&self, context: &ServerContext) {
131        // TODO: Look into replacing this with a smarter system, in theory it should be possible to
132        // always just sleep for the exact time until the next expired publish request, which could
133        // be more efficient, and would be more responsive.
134        let mut to_delete = Vec::new();
135        let mut items_to_delete = Vec::new();
136        {
137            let now = Utc::now();
138            let now_instant = Instant::now();
139            let lck = trace_read_lock!(self.inner);
140            for (session_id, sub) in lck.session_subscriptions.iter() {
141                let mut sub_lck = sub.lock();
142                items_to_delete.push((
143                    sub_lck.session().clone(),
144                    sub_lck.tick(&now, now_instant, TickReason::TickTimerFired),
145                ));
146                if sub_lck.is_ready_to_delete() {
147                    to_delete.push(*session_id);
148                }
149            }
150        }
151        if !to_delete.is_empty() {
152            let mut lck = trace_write_lock!(self.inner);
153            for id in to_delete {
154                lck.session_subscriptions.remove(&id);
155            }
156            context
157                .info
158                .diagnostics
159                .set_current_subscription_count(lck.subscription_to_session.len() as u32);
160        }
161        if !items_to_delete.is_empty() {
162            Self::delete_expired_monitored_items(context, items_to_delete).await;
163        }
164    }
165
166    async fn delete_expired_monitored_items(
167        context: &ServerContext,
168        items_to_delete: Vec<(Arc<RwLock<Session>>, Vec<MonitoredItemRef>)>,
169    ) {
170        for (session, items) in items_to_delete {
171            // Create a local request context, since we need to call delete monitored items.
172
173            let (id, token) = {
174                let lck = session.read();
175                let Some(token) = lck.user_token() else {
176                    error!("Active session missing user token, this should be impossible");
177                    continue;
178                };
179
180                (lck.session_id_numeric(), token.clone())
181            };
182            let ctx = RequestContext {
183                current_node_manager_index: 0,
184                inner: Arc::new(RequestContextInner {
185                    session,
186                    session_id: id,
187                    authenticator: context.authenticator.clone(),
188                    token,
189                    type_tree: context.type_tree.clone(),
190                    subscriptions: context.subscriptions.clone(),
191                    info: context.info.clone(),
192                    type_tree_getter: context.type_tree_getter.clone(),
193                }),
194            };
195
196            for mgr in context.node_managers.iter() {
197                let owned: Vec<_> = items
198                    .iter()
199                    .filter(|n| mgr.owns_node(n.node_id()))
200                    .collect();
201
202                if owned.is_empty() {
203                    continue;
204                }
205
206                mgr.delete_monitored_items(&ctx, &owned).await;
207            }
208        }
209    }
210
211    pub(crate) fn get_monitored_item_count(
212        &self,
213        session_id: u32,
214        subscription_id: u32,
215    ) -> Option<usize> {
216        let cache = ({
217            let lck = trace_read_lock!(self.inner);
218            lck.session_subscriptions.get(&session_id).cloned()
219        })?;
220        let cache_lck = cache.lock();
221        cache_lck.get_monitored_item_count(subscription_id)
222    }
223
224    pub(crate) fn create_subscription(
225        &self,
226        session_id: u32,
227        request: &CreateSubscriptionRequest,
228        context: &RequestContext,
229    ) -> Result<CreateSubscriptionResponse, StatusCode> {
230        let mut lck = trace_write_lock!(self.inner);
231        let cache = lck
232            .session_subscriptions
233            .entry(session_id)
234            .or_insert_with(|| {
235                Arc::new(Mutex::new(SessionSubscriptions::new(
236                    self.limits,
237                    Self::get_key(&context.session),
238                    context.session.clone(),
239                    context.info.type_tree_getter.get_type_tree_static(context),
240                )))
241            })
242            .clone();
243        let mut cache_lck = cache.lock();
244        let res = cache_lck.create_subscription(request, &context.info)?;
245        lck.subscription_to_session
246            .insert(res.subscription_id, session_id);
247        context
248            .info
249            .diagnostics
250            .set_current_subscription_count(lck.subscription_to_session.len() as u32);
251        context.info.diagnostics.inc_subscription_count();
252        Ok(res)
253    }
254
255    pub(crate) fn modify_subscription(
256        &self,
257        session_id: u32,
258        request: &ModifySubscriptionRequest,
259        info: &ServerInfo,
260    ) -> Result<ModifySubscriptionResponse, StatusCode> {
261        let Some(cache) = ({
262            let lck = trace_read_lock!(self.inner);
263            lck.session_subscriptions.get(&session_id).cloned()
264        }) else {
265            return Err(StatusCode::BadNoSubscription);
266        };
267        let mut cache_lck = cache.lock();
268        cache_lck.modify_subscription(request, info)
269    }
270
271    pub(crate) fn set_publishing_mode(
272        &self,
273        session_id: u32,
274        request: &SetPublishingModeRequest,
275    ) -> Result<SetPublishingModeResponse, StatusCode> {
276        let Some(cache) = ({
277            let lck = trace_read_lock!(self.inner);
278            lck.session_subscriptions.get(&session_id).cloned()
279        }) else {
280            return Err(StatusCode::BadNoSubscription);
281        };
282        let mut cache_lck = cache.lock();
283        cache_lck.set_publishing_mode(request)
284    }
285
286    pub(crate) fn republish(
287        &self,
288        session_id: u32,
289        request: &RepublishRequest,
290    ) -> Result<RepublishResponse, StatusCode> {
291        let Some(cache) = ({
292            let lck = trace_read_lock!(self.inner);
293            lck.session_subscriptions.get(&session_id).cloned()
294        }) else {
295            return Err(StatusCode::BadNoSubscription);
296        };
297        let cache_lck = cache.lock();
298        cache_lck.republish(request)
299    }
300
301    pub(crate) fn enqueue_publish_request(
302        &self,
303        session_id: u32,
304        now: &DateTimeUtc,
305        now_instant: Instant,
306        request: PendingPublish,
307    ) -> Result<(), StatusCode> {
308        let Some(cache) = ({
309            let lck = trace_read_lock!(self.inner);
310            lck.session_subscriptions.get(&session_id).cloned()
311        }) else {
312            return Err(StatusCode::BadNoSubscription);
313        };
314
315        let mut cache_lck = cache.lock();
316        cache_lck.enqueue_publish_request(now, now_instant, request);
317        Ok(())
318    }
319
320    /// Return a notifier for notifying the server of a batch of changes.
321    ///
322    /// Note: This contains a lock, and should _not_ be kept around for long periods of time,
323    /// or held over await points.
324    ///
325    /// The notifier submits notifications only once dropped.
326    ///
327    /// # Example
328    ///
329    /// ```ignore
330    /// let mut notifier = cache.data_notifier();
331    /// for (node_id, data_value) in my_changes {
332    ///     notifier.notify(node_id, AttributeId::Value, value);
333    /// }
334    /// ```
335    pub fn data_notifier<'a>(&'a self) -> SubscriptionDataNotifier<'a> {
336        SubscriptionDataNotifier::new(trace_read_lock!(self.inner))
337    }
338
339    /// Return a notifier for notifying the server of a batch of events.
340    ///
341    /// Note: This contains a lock, and should _not_ be kept around for long periods of time,
342    /// or held over await points.
343    ///
344    /// The notifier submits notifications only once dropped.
345    ///
346    /// # Example
347    ///
348    /// ```ignore
349    /// let mut notifier = cache.event_notifier();
350    /// for evt in my_evts {
351    ///     notifier.notify(emitter_id, evt);
352    /// }
353    /// ```
354    pub fn event_notifier<'a, 'b>(&'a self) -> SubscriptionEventNotifier<'a, 'b> {
355        SubscriptionEventNotifier::new(trace_read_lock!(self.inner))
356    }
357
358    /// Notify any listening clients about a list of data changes.
359    /// This can be called any time anything changes on the server, or only for values with
360    /// an existing monitored item. Either way this method will deal with distributing the values
361    /// to the appropriate monitored items.
362    pub fn notify_data_change<'a>(
363        &self,
364        items: impl Iterator<Item = (DataValue, &'a NodeId, AttributeId)>,
365    ) {
366        let mut notif = self.data_notifier();
367        for (dv, node_id, attribute_id) in items {
368            notif.notify(node_id, attribute_id, dv);
369        }
370    }
371
372    /// Notify with a dynamic sampler, to avoid getting values for nodes that
373    /// may not have monitored items.
374    /// This is potentially much more efficient than simply notifying blindly, but is
375    /// also somewhat harder to use.
376    pub fn maybe_notify<'a>(
377        &self,
378        items: impl Iterator<Item = (&'a NodeId, AttributeId)>,
379        sample: impl Fn(&NodeId, AttributeId, &NumericRange, &DataEncoding) -> Option<DataValue>,
380    ) {
381        let mut notif = self.data_notifier();
382        for (id, attribute_id) in items {
383            if let Some(mut batch) = notif.notify_for(id, attribute_id) {
384                for (handle, entry) in batch.entries() {
385                    if let Some(value) =
386                        sample(id, attribute_id, &entry.index_range, &entry.data_encoding)
387                    {
388                        batch.data_value_to_item(value, handle);
389                    }
390                }
391            }
392        }
393    }
394
395    /// Notify listening clients to events. Without a custom node manager implementing
396    /// event history, this is the only way to report events in the server.
397    pub fn notify_events<'a>(&self, items: impl Iterator<Item = (&'a dyn Event, &'a NodeId)>) {
398        let mut notif = self.event_notifier();
399        for (evt, id) in items {
400            notif.notify(id, evt);
401        }
402    }
403
404    pub(crate) fn create_monitored_items(
405        &self,
406        session_id: u32,
407        subscription_id: u32,
408        requests: &[CreateMonitoredItem],
409    ) -> Result<Vec<MonitoredItemCreateResult>, StatusCode> {
410        let mut lck = trace_write_lock!(self.inner);
411        let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
412            return Err(StatusCode::BadNoSubscription);
413        };
414
415        let mut cache_lck = cache.lock();
416        let result = cache_lck.create_monitored_items(subscription_id, requests);
417        if let Ok(res) = &result {
418            for (create, res) in requests.iter().zip(res.iter()) {
419                if res.status_code.is_good() {
420                    let key = MonitoredItemKey {
421                        id: create.item_to_monitor().node_id.clone(),
422                        attribute_id: create.item_to_monitor().attribute_id,
423                    };
424
425                    let index_range = create.item_to_monitor().index_range.clone();
426
427                    lck.monitored_items.entry(key).or_default().insert(
428                        create.handle(),
429                        MonitoredItemEntry {
430                            enabled: !matches!(create.monitoring_mode(), MonitoringMode::Disabled),
431                            index_range,
432                            data_encoding: create.item_to_monitor().data_encoding.clone(),
433                        },
434                    );
435                }
436            }
437        }
438
439        result
440    }
441
442    pub(crate) fn modify_monitored_items(
443        &self,
444        session_id: u32,
445        subscription_id: u32,
446        info: &ServerInfo,
447        timestamps_to_return: TimestampsToReturn,
448        requests: Vec<MonitoredItemModifyRequest>,
449        type_tree: &dyn TypeTree,
450    ) -> Result<Vec<MonitoredItemUpdateRef>, StatusCode> {
451        let Some(cache) = ({
452            let lck = trace_read_lock!(self.inner);
453            lck.session_subscriptions.get(&session_id).cloned()
454        }) else {
455            return Err(StatusCode::BadNoSubscription);
456        };
457
458        let mut cache_lck = cache.lock();
459        cache_lck.modify_monitored_items(
460            subscription_id,
461            info,
462            timestamps_to_return,
463            requests,
464            type_tree,
465        )
466    }
467
468    fn get_key(session: &RwLock<Session>) -> PersistentSessionKey {
469        let lck = trace_read_lock!(session);
470        PersistentSessionKey::new(
471            lck.user_token().unwrap(),
472            lck.message_security_mode(),
473            lck.application_description().application_uri.as_ref(),
474        )
475    }
476
477    pub(crate) fn set_monitoring_mode(
478        &self,
479        session_id: u32,
480        subscription_id: u32,
481        monitoring_mode: MonitoringMode,
482        items: Vec<u32>,
483    ) -> Result<Vec<(StatusCode, MonitoredItemRef)>, StatusCode> {
484        let mut lck = trace_write_lock!(self.inner);
485        let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
486            return Err(StatusCode::BadNoSubscription);
487        };
488
489        let mut cache_lck = cache.lock();
490        let result = cache_lck.set_monitoring_mode(subscription_id, monitoring_mode, items);
491
492        if let Ok(res) = &result {
493            for (status, rf) in res {
494                if status.is_good() {
495                    let key = MonitoredItemKeyRef {
496                        id: rf.node_id().into(),
497                        attribute_id: rf.attribute(),
498                    };
499                    if let Some(it) = lck
500                        .monitored_items
501                        .get_mut(&key)
502                        .and_then(|it| it.get_mut(&rf.handle()))
503                    {
504                        it.enabled = !matches!(monitoring_mode, MonitoringMode::Disabled);
505                    }
506                }
507            }
508        }
509        result
510    }
511
512    pub(crate) fn set_triggering(
513        &self,
514        session_id: u32,
515        subscription_id: u32,
516        triggering_item_id: u32,
517        links_to_add: Vec<u32>,
518        links_to_remove: Vec<u32>,
519    ) -> Result<(Vec<StatusCode>, Vec<StatusCode>), StatusCode> {
520        let Some(cache) = ({
521            let lck = trace_read_lock!(self.inner);
522            lck.session_subscriptions.get(&session_id).cloned()
523        }) else {
524            return Err(StatusCode::BadNoSubscription);
525        };
526
527        let mut cache_lck = cache.lock();
528        cache_lck.set_triggering(
529            subscription_id,
530            triggering_item_id,
531            links_to_add,
532            links_to_remove,
533        )
534    }
535
536    pub(crate) fn delete_monitored_items(
537        &self,
538        session_id: u32,
539        subscription_id: u32,
540        items: &[u32],
541    ) -> Result<Vec<(StatusCode, MonitoredItemRef)>, StatusCode> {
542        let mut lck = trace_write_lock!(self.inner);
543        let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
544            return Err(StatusCode::BadNoSubscription);
545        };
546
547        let mut cache_lck = cache.lock();
548        let result = cache_lck.delete_monitored_items(subscription_id, items);
549        if let Ok(res) = &result {
550            for (status, rf) in res {
551                if status.is_good() {
552                    let key = MonitoredItemKeyRef {
553                        id: rf.node_id().into(),
554                        attribute_id: rf.attribute(),
555                    };
556                    if let Some(it) = lck.monitored_items.get_mut(&key) {
557                        it.remove(&rf.handle());
558                    }
559                }
560            }
561        }
562        result
563    }
564
565    pub(crate) fn delete_subscriptions(
566        &self,
567        session_id: u32,
568        ids: &[u32],
569        info: &ServerInfo,
570    ) -> Result<Vec<(StatusCode, Vec<MonitoredItemRef>)>, StatusCode> {
571        let mut lck = trace_write_lock!(self.inner);
572        let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
573            return Err(StatusCode::BadNoSubscription);
574        };
575        let mut cache_lck = cache.lock();
576        for id in ids {
577            if cache_lck.contains(*id) {
578                lck.subscription_to_session.remove(id);
579            }
580        }
581        info.diagnostics
582            .set_current_subscription_count(lck.subscription_to_session.len() as u32);
583        let result = cache_lck.delete_subscriptions(ids);
584
585        for (status, item_res) in &result {
586            if !status.is_good() {
587                continue;
588            }
589
590            for rf in item_res {
591                if rf.attribute() == AttributeId::EventNotifier {
592                    let key = MonitoredItemKeyRef {
593                        id: rf.node_id().into(),
594                        attribute_id: rf.attribute(),
595                    };
596                    if let Some(it) = lck.monitored_items.get_mut(&key) {
597                        it.remove(&rf.handle());
598                    }
599                }
600            }
601        }
602
603        Ok(result)
604    }
605
606    pub(crate) fn get_session_subscription_ids(&self, session_id: u32) -> Vec<u32> {
607        let Some(cache) = ({
608            let lck = trace_read_lock!(self.inner);
609            lck.session_subscriptions.get(&session_id).cloned()
610        }) else {
611            return Vec::new();
612        };
613
614        let cache_lck = cache.lock();
615        cache_lck.subscription_ids()
616    }
617
618    pub(crate) fn transfer(
619        &self,
620        req: &TransferSubscriptionsRequest,
621        context: &RequestContext,
622    ) -> TransferSubscriptionsResponse {
623        let mut results: Vec<_> = req
624            .subscription_ids
625            .iter()
626            .flatten()
627            .map(|id| {
628                (
629                    *id,
630                    TransferResult {
631                        status_code: StatusCode::BadSubscriptionIdInvalid,
632                        available_sequence_numbers: None,
633                    },
634                )
635            })
636            .collect();
637
638        let key = Self::get_key(&context.session);
639        {
640            let mut lck = trace_write_lock!(self.inner);
641            let session_subs = lck
642                .session_subscriptions
643                .entry(context.session_id)
644                .or_insert_with(|| {
645                    Arc::new(Mutex::new(SessionSubscriptions::new(
646                        self.limits,
647                        key.clone(),
648                        context.session.clone(),
649                        context.info.type_tree_getter.get_type_tree_static(context),
650                    )))
651                })
652                .clone();
653            let mut session_subs_lck = session_subs.lock();
654
655            for (sub_id, res) in &mut results {
656                let Some(current_owner_session_id) = lck.subscription_to_session.get(sub_id) else {
657                    continue;
658                };
659                if context.session_id == *current_owner_session_id {
660                    res.status_code = StatusCode::Good;
661                    res.available_sequence_numbers =
662                        session_subs_lck.available_sequence_numbers(*sub_id);
663                    continue;
664                }
665
666                let Some(session_cache) = lck
667                    .session_subscriptions
668                    .get(current_owner_session_id)
669                    .cloned()
670                else {
671                    // Should be impossible.
672                    continue;
673                };
674
675                let mut session_lck = session_cache.lock();
676
677                if !session_lck.user_token().is_equivalent_for_transfer(&key) {
678                    res.status_code = StatusCode::BadUserAccessDenied;
679                    continue;
680                }
681
682                if let (Some(sub), notifs) = session_lck.remove(*sub_id) {
683                    tracing::debug!(
684                        "Transfer subscription {} to session {}",
685                        sub.id(),
686                        context.session_id
687                    );
688                    res.status_code = StatusCode::Good;
689                    res.available_sequence_numbers =
690                        Some(notifs.iter().map(|n| n.message.sequence_number).collect());
691
692                    if let Err((e, sub, notifs)) = session_subs_lck.insert(sub, notifs) {
693                        res.status_code = e;
694                        let _ = session_lck.insert(sub, notifs);
695                    } else {
696                        if req.send_initial_values {
697                            if let Some(sub) = session_subs_lck.get_mut(*sub_id) {
698                                sub.set_resend_data();
699                            }
700                        }
701                        lck.subscription_to_session
702                            .insert(*sub_id, context.session_id);
703                    }
704                }
705            }
706        }
707
708        TransferSubscriptionsResponse {
709            response_header: ResponseHeader::new_good(&req.request_header),
710            results: Some(results.into_iter().map(|r| r.1).collect()),
711            diagnostic_infos: None,
712        }
713    }
714}
715
716pub(crate) struct PendingPublish {
717    pub response: tokio::sync::oneshot::Sender<ResponseMessage>,
718    pub request: Box<PublishRequest>,
719    pub ack_results: Option<Vec<StatusCode>>,
720    pub deadline: Instant,
721}
722
723struct NonAckedPublish {
724    message: NotificationMessage,
725    subscription_id: u32,
726}
727
728#[derive(Debug, Clone)]
729struct PersistentSessionKey {
730    token: UserToken,
731    security_mode: MessageSecurityMode,
732    application_uri: String,
733}
734
735impl PersistentSessionKey {
736    fn new(token: &UserToken, security_mode: MessageSecurityMode, application_uri: &str) -> Self {
737        Self {
738            token: token.clone(),
739            security_mode,
740            application_uri: application_uri.to_owned(),
741        }
742    }
743
744    fn is_equivalent_for_transfer(&self, other: &PersistentSessionKey) -> bool {
745        if self.token.is_anonymous() {
746            other.token.is_anonymous()
747                && matches!(
748                    other.security_mode,
749                    MessageSecurityMode::Sign | MessageSecurityMode::SignAndEncrypt
750                )
751                && self.application_uri == other.application_uri
752        } else {
753            other.token == self.token
754        }
755    }
756}