opcua_server/subscriptions/
mod.rs

1mod monitored_item;
2mod notify;
3mod session_subscriptions;
4mod subscription;
5
6use std::{hash::Hash, sync::Arc, time::Instant};
7
8use chrono::Utc;
9use hashbrown::{Equivalent, HashMap};
10pub use monitored_item::{CreateMonitoredItem, MonitoredItem};
11use opcua_core::{trace_read_lock, trace_write_lock, ResponseMessage};
12use opcua_nodes::{Event, TypeTree};
13pub use session_subscriptions::SessionSubscriptions;
14use subscription::TickReason;
15pub use subscription::{MonitoredItemHandle, Subscription, SubscriptionState};
16use tracing::error;
17
18pub use notify::{
19    SubscriptionDataNotifier, SubscriptionDataNotifierBatch, SubscriptionEventNotifier,
20    SubscriptionEventNotifierBatch,
21};
22
23use opcua_core::sync::{Mutex, RwLock};
24
25use opcua_types::{
26    node_id::{IdentifierRef, NodeIdRef},
27    AttributeId, CreateSubscriptionRequest, CreateSubscriptionResponse, DataEncoding, DataValue,
28    DateTimeUtc, MessageSecurityMode, ModifySubscriptionRequest, ModifySubscriptionResponse,
29    MonitoredItemCreateResult, MonitoredItemModifyRequest, MonitoringMode, NodeId,
30    NotificationMessage, NumericRange, PublishRequest, RepublishRequest, RepublishResponse,
31    ResponseHeader, SetPublishingModeRequest, SetPublishingModeResponse, StatusCode,
32    TimestampsToReturn, TransferResult, TransferSubscriptionsRequest,
33    TransferSubscriptionsResponse,
34};
35
36use super::{
37    authenticator::UserToken,
38    info::ServerInfo,
39    node_manager::{MonitoredItemRef, MonitoredItemUpdateRef, RequestContext, ServerContext},
40    session::instance::Session,
41    SubscriptionLimits,
42};
43
44#[derive(Debug, Clone, PartialEq, Eq, Hash)]
45struct MonitoredItemKey {
46    id: NodeId,
47    attribute_id: AttributeId,
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
51struct MonitoredItemKeyRef<T: IdentifierRef> {
52    id: NodeIdRef<T>,
53    attribute_id: AttributeId,
54}
55
56impl<T> Hash for MonitoredItemKeyRef<T>
57where
58    T: IdentifierRef,
59{
60    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
61        self.id.hash(state);
62        self.attribute_id.hash(state);
63    }
64}
65
66impl<T: IdentifierRef> Equivalent<MonitoredItemKey> for MonitoredItemKeyRef<T> {
67    fn equivalent(&self, key: &MonitoredItemKey) -> bool {
68        self.id == key.id && self.attribute_id == key.attribute_id
69    }
70}
71
72/// A basic description of the monitoring parameters for a monitored item, used
73/// for conditional sampling.
74pub struct MonitoredItemEntry {
75    /// Whether the monitored item is currently active.
76    pub enabled: bool,
77    /// The data encoding of the monitored item.
78    pub data_encoding: DataEncoding,
79    /// The index range of the monitored item.
80    pub index_range: NumericRange,
81}
82
83struct SubscriptionCacheInner {
84    /// Map from session ID to subscription cache
85    session_subscriptions: HashMap<u32, Arc<Mutex<SessionSubscriptions>>>,
86    /// Map from subscription ID to session ID.
87    subscription_to_session: HashMap<u32, u32>,
88    /// Map from notifier node ID to monitored item handles.
89    monitored_items: HashMap<MonitoredItemKey, HashMap<MonitoredItemHandle, MonitoredItemEntry>>,
90}
91
92/// Structure storing all subscriptions and monitored items on the server.
93/// Used to notify users of changes.
94///
95/// Subscriptions can outlive sessions, and sessions can outlive connections,
96/// so neither can be owned by the connection. This provides convenient methods for
97/// manipulating subscriptions.
98pub struct SubscriptionCache {
99    inner: RwLock<SubscriptionCacheInner>,
100    /// Configured limits on subscriptions.
101    limits: SubscriptionLimits,
102}
103
104impl SubscriptionCache {
105    pub(crate) fn new(limits: SubscriptionLimits) -> Self {
106        Self {
107            inner: RwLock::new(SubscriptionCacheInner {
108                session_subscriptions: HashMap::new(),
109                subscription_to_session: HashMap::new(),
110                monitored_items: HashMap::new(),
111            }),
112            limits,
113        }
114    }
115
116    /// Get the `SessionSubscriptions` object for a single session by its numeric ID.
117    pub fn get_session_subscriptions(
118        &self,
119        session_id: u32,
120    ) -> Option<Arc<Mutex<SessionSubscriptions>>> {
121        let inner = trace_read_lock!(self.inner);
122        inner.session_subscriptions.get(&session_id).cloned()
123    }
124
125    /// This is the periodic subscription tick where we check for
126    /// triggered subscriptions.
127    ///
128    pub(crate) async fn periodic_tick(&self, context: &ServerContext) {
129        // TODO: Look into replacing this with a smarter system, in theory it should be possible to
130        // always just sleep for the exact time until the next expired publish request, which could
131        // be more efficient, and would be more responsive.
132        let mut to_delete = Vec::new();
133        let mut items_to_delete = Vec::new();
134        {
135            let now = Utc::now();
136            let now_instant = Instant::now();
137            let lck = trace_read_lock!(self.inner);
138            for (session_id, sub) in lck.session_subscriptions.iter() {
139                let mut sub_lck = sub.lock();
140                items_to_delete.push((
141                    sub_lck.session().clone(),
142                    sub_lck.tick(&now, now_instant, TickReason::TickTimerFired),
143                ));
144                if sub_lck.is_ready_to_delete() {
145                    to_delete.push(*session_id);
146                }
147            }
148        }
149        if !to_delete.is_empty() {
150            let mut lck = trace_write_lock!(self.inner);
151            for id in to_delete {
152                lck.session_subscriptions.remove(&id);
153            }
154            context
155                .info
156                .diagnostics
157                .set_current_subscription_count(lck.subscription_to_session.len() as u32);
158        }
159        if !items_to_delete.is_empty() {
160            Self::delete_expired_monitored_items(context, items_to_delete).await;
161        }
162    }
163
164    async fn delete_expired_monitored_items(
165        context: &ServerContext,
166        items_to_delete: Vec<(Arc<RwLock<Session>>, Vec<MonitoredItemRef>)>,
167    ) {
168        for (session, items) in items_to_delete {
169            // Create a local request context, since we need to call delete monitored items.
170
171            let (id, token) = {
172                let lck = session.read();
173                let Some(token) = lck.user_token() else {
174                    error!("Active session missing user token, this should be impossible");
175                    continue;
176                };
177
178                (lck.session_id_numeric(), token.clone())
179            };
180            let ctx = RequestContext {
181                session,
182                session_id: id,
183                authenticator: context.authenticator.clone(),
184                token,
185                current_node_manager_index: 0,
186                type_tree: context.type_tree.clone(),
187                subscriptions: context.subscriptions.clone(),
188                info: context.info.clone(),
189                type_tree_getter: context.type_tree_getter.clone(),
190            };
191
192            for mgr in context.node_managers.iter() {
193                let owned: Vec<_> = items
194                    .iter()
195                    .filter(|n| mgr.owns_node(n.node_id()))
196                    .collect();
197
198                if owned.is_empty() {
199                    continue;
200                }
201
202                mgr.delete_monitored_items(&ctx, &owned).await;
203            }
204        }
205    }
206
207    pub(crate) fn get_monitored_item_count(
208        &self,
209        session_id: u32,
210        subscription_id: u32,
211    ) -> Option<usize> {
212        let cache = ({
213            let lck = trace_read_lock!(self.inner);
214            lck.session_subscriptions.get(&session_id).cloned()
215        })?;
216        let cache_lck = cache.lock();
217        cache_lck.get_monitored_item_count(subscription_id)
218    }
219
220    pub(crate) fn create_subscription(
221        &self,
222        session_id: u32,
223        request: &CreateSubscriptionRequest,
224        context: &RequestContext,
225    ) -> Result<CreateSubscriptionResponse, StatusCode> {
226        let mut lck = trace_write_lock!(self.inner);
227        let cache = lck
228            .session_subscriptions
229            .entry(session_id)
230            .or_insert_with(|| {
231                Arc::new(Mutex::new(SessionSubscriptions::new(
232                    self.limits,
233                    Self::get_key(&context.session),
234                    context.session.clone(),
235                    context.info.type_tree_getter.get_type_tree_static(context),
236                )))
237            })
238            .clone();
239        let mut cache_lck = cache.lock();
240        let res = cache_lck.create_subscription(request, &context.info)?;
241        lck.subscription_to_session
242            .insert(res.subscription_id, session_id);
243        context
244            .info
245            .diagnostics
246            .set_current_subscription_count(lck.subscription_to_session.len() as u32);
247        context.info.diagnostics.inc_subscription_count();
248        Ok(res)
249    }
250
251    pub(crate) fn modify_subscription(
252        &self,
253        session_id: u32,
254        request: &ModifySubscriptionRequest,
255        info: &ServerInfo,
256    ) -> Result<ModifySubscriptionResponse, StatusCode> {
257        let Some(cache) = ({
258            let lck = trace_read_lock!(self.inner);
259            lck.session_subscriptions.get(&session_id).cloned()
260        }) else {
261            return Err(StatusCode::BadNoSubscription);
262        };
263        let mut cache_lck = cache.lock();
264        cache_lck.modify_subscription(request, info)
265    }
266
267    pub(crate) fn set_publishing_mode(
268        &self,
269        session_id: u32,
270        request: &SetPublishingModeRequest,
271    ) -> Result<SetPublishingModeResponse, StatusCode> {
272        let Some(cache) = ({
273            let lck = trace_read_lock!(self.inner);
274            lck.session_subscriptions.get(&session_id).cloned()
275        }) else {
276            return Err(StatusCode::BadNoSubscription);
277        };
278        let mut cache_lck = cache.lock();
279        cache_lck.set_publishing_mode(request)
280    }
281
282    pub(crate) fn republish(
283        &self,
284        session_id: u32,
285        request: &RepublishRequest,
286    ) -> Result<RepublishResponse, StatusCode> {
287        let Some(cache) = ({
288            let lck = trace_read_lock!(self.inner);
289            lck.session_subscriptions.get(&session_id).cloned()
290        }) else {
291            return Err(StatusCode::BadNoSubscription);
292        };
293        let cache_lck = cache.lock();
294        cache_lck.republish(request)
295    }
296
297    pub(crate) fn enqueue_publish_request(
298        &self,
299        session_id: u32,
300        now: &DateTimeUtc,
301        now_instant: Instant,
302        request: PendingPublish,
303    ) -> Result<(), StatusCode> {
304        let Some(cache) = ({
305            let lck = trace_read_lock!(self.inner);
306            lck.session_subscriptions.get(&session_id).cloned()
307        }) else {
308            return Err(StatusCode::BadNoSubscription);
309        };
310
311        let mut cache_lck = cache.lock();
312        cache_lck.enqueue_publish_request(now, now_instant, request);
313        Ok(())
314    }
315
316    /// Return a notifier for notifying the server of a batch of changes.
317    ///
318    /// Note: This contains a lock, and should _not_ be kept around for long periods of time,
319    /// or held over await points.
320    ///
321    /// The notifier submits notifications only once dropped.
322    ///
323    /// # Example
324    ///
325    /// ```ignore
326    /// let mut notifier = cache.data_notifier();
327    /// for (node_id, data_value) in my_changes {
328    ///     notifier.notify(node_id, AttributeId::Value, value);
329    /// }
330    /// ```
331    pub fn data_notifier<'a>(&'a self) -> SubscriptionDataNotifier<'a> {
332        SubscriptionDataNotifier::new(trace_read_lock!(self.inner))
333    }
334
335    /// Return a notifier for notifying the server of a batch of events.
336    ///
337    /// Note: This contains a lock, and should _not_ be kept around for long periods of time,
338    /// or held over await points.
339    ///
340    /// The notifier submits notifications only once dropped.
341    ///
342    /// # Example
343    ///
344    /// ```ignore
345    /// let mut notifier = cache.event_notifier();
346    /// for evt in my_evts {
347    ///     notifier.notify(emitter_id, evt);
348    /// }
349    /// ```
350    pub fn event_notifier<'a, 'b>(&'a self) -> SubscriptionEventNotifier<'a, 'b> {
351        SubscriptionEventNotifier::new(trace_read_lock!(self.inner))
352    }
353
354    /// Notify any listening clients about a list of data changes.
355    /// This can be called any time anything changes on the server, or only for values with
356    /// an existing monitored item. Either way this method will deal with distributing the values
357    /// to the appropriate monitored items.
358    pub fn notify_data_change<'a>(
359        &self,
360        items: impl Iterator<Item = (DataValue, &'a NodeId, AttributeId)>,
361    ) {
362        let mut notif = self.data_notifier();
363        for (dv, node_id, attribute_id) in items {
364            notif.notify(node_id, attribute_id, dv);
365        }
366    }
367
368    /// Notify with a dynamic sampler, to avoid getting values for nodes that
369    /// may not have monitored items.
370    /// This is potentially much more efficient than simply notifying blindly, but is
371    /// also somewhat harder to use.
372    pub fn maybe_notify<'a>(
373        &self,
374        items: impl Iterator<Item = (&'a NodeId, AttributeId)>,
375        sample: impl Fn(&NodeId, AttributeId, &NumericRange, &DataEncoding) -> Option<DataValue>,
376    ) {
377        let mut notif = self.data_notifier();
378        for (id, attribute_id) in items {
379            if let Some(mut batch) = notif.notify_for(id, attribute_id) {
380                for (handle, entry) in batch.entries() {
381                    if let Some(value) =
382                        sample(id, attribute_id, &entry.index_range, &entry.data_encoding)
383                    {
384                        batch.data_value_to_item(value, handle);
385                    }
386                }
387            }
388        }
389    }
390
391    /// Notify listening clients to events. Without a custom node manager implementing
392    /// event history, this is the only way to report events in the server.
393    pub fn notify_events<'a>(&self, items: impl Iterator<Item = (&'a dyn Event, &'a NodeId)>) {
394        let mut notif = self.event_notifier();
395        for (evt, id) in items {
396            notif.notify(id, evt);
397        }
398    }
399
400    pub(crate) fn create_monitored_items(
401        &self,
402        session_id: u32,
403        subscription_id: u32,
404        requests: &[CreateMonitoredItem],
405    ) -> Result<Vec<MonitoredItemCreateResult>, StatusCode> {
406        let mut lck = trace_write_lock!(self.inner);
407        let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
408            return Err(StatusCode::BadNoSubscription);
409        };
410
411        let mut cache_lck = cache.lock();
412        let result = cache_lck.create_monitored_items(subscription_id, requests);
413        if let Ok(res) = &result {
414            for (create, res) in requests.iter().zip(res.iter()) {
415                if res.status_code.is_good() {
416                    let key = MonitoredItemKey {
417                        id: create.item_to_monitor().node_id.clone(),
418                        attribute_id: create.item_to_monitor().attribute_id,
419                    };
420
421                    let index_range = create.item_to_monitor().index_range.clone();
422
423                    lck.monitored_items.entry(key).or_default().insert(
424                        create.handle(),
425                        MonitoredItemEntry {
426                            enabled: !matches!(create.monitoring_mode(), MonitoringMode::Disabled),
427                            index_range,
428                            data_encoding: create.item_to_monitor().data_encoding.clone(),
429                        },
430                    );
431                }
432            }
433        }
434
435        result
436    }
437
438    pub(crate) fn modify_monitored_items(
439        &self,
440        session_id: u32,
441        subscription_id: u32,
442        info: &ServerInfo,
443        timestamps_to_return: TimestampsToReturn,
444        requests: Vec<MonitoredItemModifyRequest>,
445        type_tree: &dyn TypeTree,
446    ) -> Result<Vec<MonitoredItemUpdateRef>, StatusCode> {
447        let Some(cache) = ({
448            let lck = trace_read_lock!(self.inner);
449            lck.session_subscriptions.get(&session_id).cloned()
450        }) else {
451            return Err(StatusCode::BadNoSubscription);
452        };
453
454        let mut cache_lck = cache.lock();
455        cache_lck.modify_monitored_items(
456            subscription_id,
457            info,
458            timestamps_to_return,
459            requests,
460            type_tree,
461        )
462    }
463
464    fn get_key(session: &RwLock<Session>) -> PersistentSessionKey {
465        let lck = trace_read_lock!(session);
466        PersistentSessionKey::new(
467            lck.user_token().unwrap(),
468            lck.message_security_mode(),
469            lck.application_description().application_uri.as_ref(),
470        )
471    }
472
473    pub(crate) fn set_monitoring_mode(
474        &self,
475        session_id: u32,
476        subscription_id: u32,
477        monitoring_mode: MonitoringMode,
478        items: Vec<u32>,
479    ) -> Result<Vec<(StatusCode, MonitoredItemRef)>, StatusCode> {
480        let mut lck = trace_write_lock!(self.inner);
481        let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
482            return Err(StatusCode::BadNoSubscription);
483        };
484
485        let mut cache_lck = cache.lock();
486        let result = cache_lck.set_monitoring_mode(subscription_id, monitoring_mode, items);
487
488        if let Ok(res) = &result {
489            for (status, rf) in res {
490                if status.is_good() {
491                    let key = MonitoredItemKeyRef {
492                        id: rf.node_id().into(),
493                        attribute_id: rf.attribute(),
494                    };
495                    if let Some(it) = lck
496                        .monitored_items
497                        .get_mut(&key)
498                        .and_then(|it| it.get_mut(&rf.handle()))
499                    {
500                        it.enabled = !matches!(monitoring_mode, MonitoringMode::Disabled);
501                    }
502                }
503            }
504        }
505        result
506    }
507
508    pub(crate) fn set_triggering(
509        &self,
510        session_id: u32,
511        subscription_id: u32,
512        triggering_item_id: u32,
513        links_to_add: Vec<u32>,
514        links_to_remove: Vec<u32>,
515    ) -> Result<(Vec<StatusCode>, Vec<StatusCode>), StatusCode> {
516        let Some(cache) = ({
517            let lck = trace_read_lock!(self.inner);
518            lck.session_subscriptions.get(&session_id).cloned()
519        }) else {
520            return Err(StatusCode::BadNoSubscription);
521        };
522
523        let mut cache_lck = cache.lock();
524        cache_lck.set_triggering(
525            subscription_id,
526            triggering_item_id,
527            links_to_add,
528            links_to_remove,
529        )
530    }
531
532    pub(crate) fn delete_monitored_items(
533        &self,
534        session_id: u32,
535        subscription_id: u32,
536        items: &[u32],
537    ) -> Result<Vec<(StatusCode, MonitoredItemRef)>, StatusCode> {
538        let mut lck = trace_write_lock!(self.inner);
539        let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
540            return Err(StatusCode::BadNoSubscription);
541        };
542
543        let mut cache_lck = cache.lock();
544        let result = cache_lck.delete_monitored_items(subscription_id, items);
545        if let Ok(res) = &result {
546            for (status, rf) in res {
547                if status.is_good() {
548                    let key = MonitoredItemKeyRef {
549                        id: rf.node_id().into(),
550                        attribute_id: rf.attribute(),
551                    };
552                    if let Some(it) = lck.monitored_items.get_mut(&key) {
553                        it.remove(&rf.handle());
554                    }
555                }
556            }
557        }
558        result
559    }
560
561    pub(crate) fn delete_subscriptions(
562        &self,
563        session_id: u32,
564        ids: &[u32],
565        info: &ServerInfo,
566    ) -> Result<Vec<(StatusCode, Vec<MonitoredItemRef>)>, StatusCode> {
567        let mut lck = trace_write_lock!(self.inner);
568        let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else {
569            return Err(StatusCode::BadNoSubscription);
570        };
571        let mut cache_lck = cache.lock();
572        for id in ids {
573            if cache_lck.contains(*id) {
574                lck.subscription_to_session.remove(id);
575            }
576        }
577        info.diagnostics
578            .set_current_subscription_count(lck.subscription_to_session.len() as u32);
579        let result = cache_lck.delete_subscriptions(ids);
580
581        for (status, item_res) in &result {
582            if !status.is_good() {
583                continue;
584            }
585
586            for rf in item_res {
587                if rf.attribute() == AttributeId::EventNotifier {
588                    let key = MonitoredItemKeyRef {
589                        id: rf.node_id().into(),
590                        attribute_id: rf.attribute(),
591                    };
592                    if let Some(it) = lck.monitored_items.get_mut(&key) {
593                        it.remove(&rf.handle());
594                    }
595                }
596            }
597        }
598
599        Ok(result)
600    }
601
602    pub(crate) fn get_session_subscription_ids(&self, session_id: u32) -> Vec<u32> {
603        let Some(cache) = ({
604            let lck = trace_read_lock!(self.inner);
605            lck.session_subscriptions.get(&session_id).cloned()
606        }) else {
607            return Vec::new();
608        };
609
610        let cache_lck = cache.lock();
611        cache_lck.subscription_ids()
612    }
613
614    pub(crate) fn transfer(
615        &self,
616        req: &TransferSubscriptionsRequest,
617        context: &RequestContext,
618    ) -> TransferSubscriptionsResponse {
619        let mut results: Vec<_> = req
620            .subscription_ids
621            .iter()
622            .flatten()
623            .map(|id| {
624                (
625                    *id,
626                    TransferResult {
627                        status_code: StatusCode::BadSubscriptionIdInvalid,
628                        available_sequence_numbers: None,
629                    },
630                )
631            })
632            .collect();
633
634        let key = Self::get_key(&context.session);
635        {
636            let mut lck = trace_write_lock!(self.inner);
637            let session_subs = lck
638                .session_subscriptions
639                .entry(context.session_id)
640                .or_insert_with(|| {
641                    Arc::new(Mutex::new(SessionSubscriptions::new(
642                        self.limits,
643                        key.clone(),
644                        context.session.clone(),
645                        context.info.type_tree_getter.get_type_tree_static(context),
646                    )))
647                })
648                .clone();
649            let mut session_subs_lck = session_subs.lock();
650
651            for (sub_id, res) in &mut results {
652                let Some(current_owner_session_id) = lck.subscription_to_session.get(sub_id) else {
653                    continue;
654                };
655                if context.session_id == *current_owner_session_id {
656                    res.status_code = StatusCode::Good;
657                    res.available_sequence_numbers =
658                        session_subs_lck.available_sequence_numbers(*sub_id);
659                    continue;
660                }
661
662                let Some(session_cache) = lck
663                    .session_subscriptions
664                    .get(current_owner_session_id)
665                    .cloned()
666                else {
667                    // Should be impossible.
668                    continue;
669                };
670
671                let mut session_lck = session_cache.lock();
672
673                if !session_lck.user_token().is_equivalent_for_transfer(&key) {
674                    res.status_code = StatusCode::BadUserAccessDenied;
675                    continue;
676                }
677
678                if let (Some(sub), notifs) = session_lck.remove(*sub_id) {
679                    tracing::debug!(
680                        "Transfer subscription {} to session {}",
681                        sub.id(),
682                        context.session_id
683                    );
684                    res.status_code = StatusCode::Good;
685                    res.available_sequence_numbers =
686                        Some(notifs.iter().map(|n| n.message.sequence_number).collect());
687
688                    if let Err((e, sub, notifs)) = session_subs_lck.insert(sub, notifs) {
689                        res.status_code = e;
690                        let _ = session_lck.insert(sub, notifs);
691                    } else {
692                        if req.send_initial_values {
693                            if let Some(sub) = session_subs_lck.get_mut(*sub_id) {
694                                sub.set_resend_data();
695                            }
696                        }
697                        lck.subscription_to_session
698                            .insert(*sub_id, context.session_id);
699                    }
700                }
701            }
702        }
703
704        TransferSubscriptionsResponse {
705            response_header: ResponseHeader::new_good(&req.request_header),
706            results: Some(results.into_iter().map(|r| r.1).collect()),
707            diagnostic_infos: None,
708        }
709    }
710}
711
712pub(crate) struct PendingPublish {
713    pub response: tokio::sync::oneshot::Sender<ResponseMessage>,
714    pub request: Box<PublishRequest>,
715    pub ack_results: Option<Vec<StatusCode>>,
716    pub deadline: Instant,
717}
718
719struct NonAckedPublish {
720    message: NotificationMessage,
721    subscription_id: u32,
722}
723
724#[derive(Debug, Clone)]
725struct PersistentSessionKey {
726    token: UserToken,
727    security_mode: MessageSecurityMode,
728    application_uri: String,
729}
730
731impl PersistentSessionKey {
732    fn new(token: &UserToken, security_mode: MessageSecurityMode, application_uri: &str) -> Self {
733        Self {
734            token: token.clone(),
735            security_mode,
736            application_uri: application_uri.to_owned(),
737        }
738    }
739
740    fn is_equivalent_for_transfer(&self, other: &PersistentSessionKey) -> bool {
741        if self.token.is_anonymous() {
742            other.token.is_anonymous()
743                && matches!(
744                    other.security_mode,
745                    MessageSecurityMode::Sign | MessageSecurityMode::SignAndEncrypt
746                )
747                && self.application_uri == other.application_uri
748        } else {
749            other.token == self.token
750        }
751    }
752}