Skip to main content

opcua/server/
session.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4use std::{
5    collections::{HashMap, HashSet, VecDeque},
6    sync::{
7        atomic::{AtomicI32, Ordering},
8        Arc,
9    },
10};
11
12use chrono::Utc;
13
14use crate::crypto::X509;
15use crate::sync::*;
16use crate::types::{service_types::PublishRequest, status_code::StatusCode, *};
17
18use crate::server::{
19    address_space::{AddressSpace, UserAccessLevel},
20    continuation_point::BrowseContinuationPoint,
21    diagnostics::ServerDiagnostics,
22    identity_token::IdentityToken,
23    session_diagnostics::SessionDiagnostics,
24    state::ServerState,
25    subscriptions::subscription::TickReason,
26    subscriptions::subscriptions::Subscriptions,
27};
28
29/// Session info holds information about a session created by CreateSession service
30#[derive(Clone)]
31pub struct SessionInfo {}
32
33const PUBLISH_REQUEST_TIMEOUT: i64 = 30000;
34
35lazy_static! {
36    static ref NEXT_SESSION_ID: AtomicI32 = AtomicI32::new(1);
37}
38
39fn next_session_id() -> NodeId {
40    // Session id will be a string identifier
41    let session_id = NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed);
42    let session_id = format!("Session-{}", session_id);
43    NodeId::new(1, session_id)
44}
45
46pub enum ServerUserIdentityToken {
47    Empty,
48    AnonymousIdentityToken,
49    UserNameIdentityToken(UserIdentityToken),
50    X509IdentityToken(X509IdentityToken),
51    Invalid(ExtensionObject),
52}
53
54pub struct SessionManager {
55    pub sessions: HashMap<NodeId, Arc<RwLock<Session>>>,
56    pub sessions_terminated: bool,
57}
58
59impl Default for SessionManager {
60    fn default() -> Self {
61        Self {
62            sessions: HashMap::new(),
63            sessions_terminated: false,
64        }
65    }
66}
67
68impl SessionManager {
69    pub fn len(&self) -> usize {
70        self.sessions.len()
71    }
72
73    pub fn first(&self) -> Option<Arc<RwLock<Session>>> {
74        self.sessions.iter().next().map(|(_, s)| s.clone())
75    }
76
77    pub fn sessions_terminated(&self) -> bool {
78        self.sessions_terminated
79    }
80
81    /// Puts all sessions into a terminated state, deregisters them, and clears the map
82    pub fn clear(&mut self, address_space: Arc<RwLock<AddressSpace>>) {
83        for (_nodeid, session) in self.sessions.drain() {
84            let mut session = trace_write_lock!(session);
85            session.set_terminated();
86            let mut space = trace_write_lock!(address_space);
87            let diagnostics = trace_write_lock!(session.session_diagnostics);
88            diagnostics.deregister_session(&session, &mut space);
89        }
90    }
91
92    /// Find a session by its session id and return it.
93    pub fn find_session_by_id(&self, session_id: &NodeId) -> Option<Arc<RwLock<Session>>> {
94        self.sessions
95            .iter()
96            .find(|s| {
97                let session = trace_read_lock!(s.1);
98                session.session_id() == session_id
99            })
100            .map(|s| s.1)
101            .cloned()
102    }
103
104    /// Finds the session by its authentication token and returns it. The authentication token
105    /// can be renewed so  it is not used as a key.
106    pub fn find_session_by_token(
107        &self,
108        authentication_token: &NodeId,
109    ) -> Option<Arc<RwLock<Session>>> {
110        self.sessions
111            .iter()
112            .find(|s| {
113                let session = trace_read_lock!(s.1);
114                session.authentication_token() == authentication_token
115            })
116            .map(|s| s.1)
117            .cloned()
118    }
119
120    /// Register the session in the map so it can be searched on
121    pub fn register_session(&mut self, session: Arc<RwLock<Session>>) {
122        let session_id = {
123            let session = trace_read_lock!(session);
124            session.session_id().clone()
125        };
126        self.sessions.insert(session_id, session);
127    }
128
129    /// Deregisters a session from the map
130    pub fn deregister_session(
131        &mut self,
132        session: Arc<RwLock<Session>>,
133    ) -> Option<Arc<RwLock<Session>>> {
134        let session = trace_read_lock!(session);
135        let session_id = session.session_id();
136        debug!(
137            "deregister_session with session id {}, auth token {}",
138            session_id,
139            session.authentication_token()
140        );
141        let result = self.sessions.remove(session_id);
142        debug!(
143            "deregister_session, new session count = {}",
144            self.sessions.len()
145        );
146        self.sessions_terminated = self.sessions.is_empty();
147        result
148    }
149}
150
151/// The Session is any state maintained between the client and server
152pub struct Session {
153    /// The session identifier
154    session_id: NodeId,
155    /// Security policy
156    security_policy_uri: String,
157    /// Secure channel id
158    secure_channel_id: u32,
159    /// Client's certificate
160    client_certificate: Option<X509>,
161    /// Authentication token for the session
162    authentication_token: NodeId,
163    /// Session nonce
164    session_nonce: ByteString,
165    /// Session name (supplied by client)
166    session_name: UAString,
167    /// Session timeout
168    session_timeout: f64,
169    /// User identity token
170    user_identity: IdentityToken,
171    /// Session's preferred locale ids
172    locale_ids: Option<Vec<UAString>>,
173    /// Negotiated max request message size
174    max_request_message_size: u32,
175    /// Negotiated max response message size
176    max_response_message_size: u32,
177    /// Endpoint url for this session
178    endpoint_url: UAString,
179    /// Maximum number of continuation points
180    max_browse_continuation_points: usize,
181    /// Browse continuation points (oldest to newest)
182    browse_continuation_points: VecDeque<BrowseContinuationPoint>,
183    /// Diagnostics associated with the server
184    diagnostics: Arc<RwLock<ServerDiagnostics>>,
185    /// Diagnostics associated with the session
186    session_diagnostics: Arc<RwLock<SessionDiagnostics>>,
187    /// Indicates if the session has received an ActivateSession
188    activated: bool,
189    /// Flag to indicate session should be terminated
190    terminate_session: bool,
191    /// Time that session was terminated, helps with recovering sessions, or clearing them out
192    terminated_at: DateTimeUtc,
193    /// Flag indicating session is actually terminated
194    terminated: bool,
195    /// Flag indicating broadly if this session may modify the address space by adding or removing
196    /// nodes or references to nodes.
197    can_modify_address_space: bool,
198    /// Timestamp of the last service request to have happened (only counts service requests while there is a session)
199    last_service_request_timestamp: DateTimeUtc,
200    /// Subscriptions associated with the session
201    subscriptions: Subscriptions,
202}
203
204impl Drop for Session {
205    fn drop(&mut self) {
206        info!("Session is being dropped");
207        let mut diagnostics = trace_write_lock!(self.diagnostics);
208        diagnostics.on_destroy_session(self);
209    }
210}
211
212impl Session {
213    #[cfg(test)]
214    pub fn new_no_certificate_store() -> Session {
215        let max_browse_continuation_points = super::constants::MAX_BROWSE_CONTINUATION_POINTS;
216        let session = Session {
217            subscriptions: Subscriptions::new(100, PUBLISH_REQUEST_TIMEOUT),
218            session_id: next_session_id(),
219            secure_channel_id: 0,
220            activated: false,
221            terminate_session: false,
222            terminated: false,
223            terminated_at: chrono::Utc::now(),
224            client_certificate: None,
225            security_policy_uri: String::new(),
226            authentication_token: NodeId::null(),
227            session_nonce: ByteString::null(),
228            session_name: UAString::null(),
229            session_timeout: 0f64,
230            user_identity: IdentityToken::None,
231            locale_ids: None,
232            max_request_message_size: 0,
233            max_response_message_size: 0,
234            endpoint_url: UAString::null(),
235            max_browse_continuation_points,
236            browse_continuation_points: VecDeque::with_capacity(max_browse_continuation_points),
237            can_modify_address_space: true,
238            diagnostics: Arc::new(RwLock::new(ServerDiagnostics::default())),
239            session_diagnostics: Arc::new(RwLock::new(SessionDiagnostics::default())),
240            last_service_request_timestamp: Utc::now(),
241        };
242
243        {
244            let mut diagnostics = trace_write_lock!(session.diagnostics);
245            diagnostics.on_create_session(&session);
246        }
247        session
248    }
249
250    /// Create a `Session` from a `Server`
251    pub fn new(server_state: Arc<RwLock<ServerState>>) -> Session {
252        let max_browse_continuation_points = super::constants::MAX_BROWSE_CONTINUATION_POINTS;
253
254        let server_state = trace_read_lock!(server_state);
255        let max_subscriptions = server_state.max_subscriptions;
256        let diagnostics = server_state.diagnostics.clone();
257        let can_modify_address_space = {
258            let config = trace_read_lock!(server_state.config);
259            config.limits.clients_can_modify_address_space
260        };
261
262        let session = Session {
263            subscriptions: Subscriptions::new(max_subscriptions, PUBLISH_REQUEST_TIMEOUT),
264            session_id: next_session_id(),
265            secure_channel_id: 0,
266            activated: false,
267            terminate_session: false,
268            terminated: false,
269            terminated_at: chrono::Utc::now(),
270            client_certificate: None,
271            security_policy_uri: String::new(),
272            authentication_token: NodeId::null(),
273            session_nonce: ByteString::null(),
274            session_name: UAString::null(),
275            session_timeout: 0f64,
276            user_identity: IdentityToken::None,
277            locale_ids: None,
278            max_request_message_size: 0,
279            max_response_message_size: 0,
280            endpoint_url: UAString::null(),
281            max_browse_continuation_points,
282            browse_continuation_points: VecDeque::with_capacity(max_browse_continuation_points),
283            can_modify_address_space,
284            diagnostics,
285            session_diagnostics: Arc::new(RwLock::new(SessionDiagnostics::default())),
286            last_service_request_timestamp: Utc::now(),
287        };
288        {
289            let mut diagnostics = trace_write_lock!(session.diagnostics);
290            diagnostics.on_create_session(&session);
291        }
292        session
293    }
294
295    pub fn session_id(&self) -> &NodeId {
296        &self.session_id
297    }
298
299    pub fn set_activated(&mut self, activated: bool) {
300        self.activated = activated;
301    }
302
303    pub fn is_activated(&self) -> bool {
304        self.activated
305    }
306
307    pub fn is_terminated(&self) -> bool {
308        self.terminated
309    }
310
311    pub fn terminated_at(&self) -> DateTimeUtc {
312        self.terminated_at
313    }
314
315    pub fn set_terminated(&mut self) {
316        info!("Session being set to terminated");
317        self.terminated = true;
318        self.terminated_at = chrono::Utc::now();
319    }
320
321    pub fn secure_channel_id(&self) -> u32 {
322        self.secure_channel_id
323    }
324
325    pub fn set_secure_channel_id(&mut self, secure_channel_id: u32) {
326        self.secure_channel_id = secure_channel_id;
327    }
328
329    pub fn authentication_token(&self) -> &NodeId {
330        &self.authentication_token
331    }
332
333    pub fn set_authentication_token(&mut self, authentication_token: NodeId) {
334        self.authentication_token = authentication_token;
335    }
336
337    pub fn session_timeout(&self) -> f64 {
338        self.session_timeout
339    }
340
341    pub fn set_session_timeout(&mut self, session_timeout: f64) {
342        self.session_timeout = session_timeout;
343    }
344
345    pub fn set_max_request_message_size(&mut self, max_request_message_size: u32) {
346        self.max_request_message_size = max_request_message_size;
347    }
348
349    pub fn set_max_response_message_size(&mut self, max_response_message_size: u32) {
350        self.max_response_message_size = max_response_message_size;
351    }
352
353    pub fn endpoint_url(&self) -> &UAString {
354        &self.endpoint_url
355    }
356
357    pub fn set_endpoint_url(&mut self, endpoint_url: UAString) {
358        self.endpoint_url = endpoint_url;
359    }
360
361    pub fn set_security_policy_uri(&mut self, security_policy_uri: &str) {
362        self.security_policy_uri = security_policy_uri.to_string();
363    }
364
365    pub fn set_user_identity(&mut self, user_identity: IdentityToken) {
366        self.user_identity = user_identity;
367    }
368
369    pub fn last_service_request_timestamp(&self) -> DateTimeUtc {
370        self.last_service_request_timestamp
371    }
372
373    pub fn set_last_service_request_timestamp(
374        &mut self,
375        last_service_request_timestamp: DateTimeUtc,
376    ) {
377        self.last_service_request_timestamp = last_service_request_timestamp;
378    }
379
380    pub fn locale_ids(&self) -> &Option<Vec<UAString>> {
381        &self.locale_ids
382    }
383
384    pub fn set_locale_ids(&mut self, locale_ids: Option<Vec<UAString>>) {
385        self.locale_ids = locale_ids;
386    }
387
388    pub fn client_certificate(&self) -> &Option<X509> {
389        &self.client_certificate
390    }
391
392    pub fn set_client_certificate(&mut self, client_certificate: Option<X509>) {
393        self.client_certificate = client_certificate;
394    }
395
396    pub fn session_nonce(&self) -> &ByteString {
397        &self.session_nonce
398    }
399
400    pub fn set_session_nonce(&mut self, session_nonce: ByteString) {
401        self.session_nonce = session_nonce;
402    }
403
404    pub fn session_name(&self) -> &UAString {
405        &self.session_name
406    }
407
408    pub fn set_session_name(&mut self, session_name: UAString) {
409        self.session_name = session_name;
410    }
411
412    pub(crate) fn session_diagnostics(&self) -> Arc<RwLock<SessionDiagnostics>> {
413        self.session_diagnostics.clone()
414    }
415
416    pub(crate) fn subscriptions(&self) -> &Subscriptions {
417        &self.subscriptions
418    }
419
420    pub(crate) fn subscriptions_mut(&mut self) -> &mut Subscriptions {
421        &mut self.subscriptions
422    }
423
424    pub(crate) fn enqueue_publish_request(
425        &mut self,
426        now: &DateTimeUtc,
427        request_id: u32,
428        request: PublishRequest,
429        address_space: &AddressSpace,
430    ) -> Result<(), StatusCode> {
431        self.subscriptions
432            .enqueue_publish_request(now, request_id, request, address_space)
433    }
434
435    pub(crate) fn tick_subscriptions(
436        &mut self,
437        now: &DateTimeUtc,
438        address_space: &AddressSpace,
439        reason: TickReason,
440    ) -> Result<(), StatusCode> {
441        self.subscriptions.tick(now, address_space, reason)
442    }
443
444    /// Reset the lifetime counter on the subscription, e.g. because a service references the
445    /// subscription.
446    pub(crate) fn reset_subscription_lifetime_counter(&mut self, subscription_id: u32) {
447        if let Some(subscription) = self.subscriptions.get_mut(subscription_id) {
448            subscription.reset_lifetime_counter();
449        }
450    }
451
452    /// Iterates through the existing queued publish requests and creates a timeout
453    /// publish response any that have expired.
454    pub(crate) fn expire_stale_publish_requests(&mut self, now: &DateTimeUtc) {
455        self.subscriptions.expire_stale_publish_requests(now);
456    }
457
458    pub(crate) fn add_browse_continuation_point(
459        &mut self,
460        continuation_point: BrowseContinuationPoint,
461    ) {
462        // Remove excess browse continuation points
463        while self.browse_continuation_points.len() >= self.max_browse_continuation_points {
464            let continuation_point = self.browse_continuation_points.pop_front();
465            debug!(
466                "Removing old continuation point {} to make way for new one",
467                continuation_point.unwrap().id.as_base64()
468            );
469        }
470        self.browse_continuation_points
471            .push_back(continuation_point);
472    }
473
474    /// Finds and REMOVES a continuation point by id.
475    pub(crate) fn find_browse_continuation_point(
476        &mut self,
477        id: &ByteString,
478    ) -> Option<BrowseContinuationPoint> {
479        if let Some(idx) = self
480            .browse_continuation_points
481            .iter()
482            .position(|continuation_point| continuation_point.id == *id)
483        {
484            self.browse_continuation_points.remove(idx)
485        } else {
486            None
487        }
488    }
489
490    pub(crate) fn remove_expired_browse_continuation_points(
491        &mut self,
492        address_space: &AddressSpace,
493    ) {
494        self.browse_continuation_points.retain(|continuation_point| {
495            let valid = continuation_point.is_valid_browse_continuation_point(address_space);
496            if !valid {
497                debug!("Continuation point {:?} is no longer valid and will be removed, address space last modified = {}", continuation_point, address_space.last_modified());
498            }
499            valid
500        });
501    }
502
503    /// Remove all the specified continuation points by id
504    pub(crate) fn remove_browse_continuation_points(&mut self, continuation_points: &[ByteString]) {
505        // Turn the supplied slice into a set
506        let continuation_points_set: HashSet<ByteString> =
507            continuation_points.iter().cloned().collect();
508        // Now remove any continuation points that are part of that set
509        self.browse_continuation_points
510            .retain(|continuation_point| !continuation_points_set.contains(&continuation_point.id));
511    }
512
513    pub(crate) fn can_modify_address_space(&self) -> bool {
514        self.can_modify_address_space
515    }
516
517    #[cfg(test)]
518    pub(crate) fn set_can_modify_address_space(&mut self, can_modify_address_space: bool) {
519        self.can_modify_address_space = can_modify_address_space;
520    }
521
522    pub(crate) fn effective_user_access_level(
523        &self,
524        user_access_level: UserAccessLevel,
525        _node_id: &NodeId,
526        _attribute_id: AttributeId,
527    ) -> UserAccessLevel {
528        // TODO session could modify the user_access_level further here via user / groups
529        user_access_level
530    }
531
532    /// Helper function to return the client user id from the identity token or None of there is no user id
533    ///
534    /// This conforms to OPC Part 5 6.4.3 ClientUserId
535    pub fn client_user_id(&self) -> UAString {
536        match self.user_identity {
537            IdentityToken::None | IdentityToken::AnonymousIdentityToken(_) => UAString::null(),
538            IdentityToken::UserNameIdentityToken(ref token) => token.user_name.clone(),
539            IdentityToken::X509IdentityToken(ref token) => {
540                if let Ok(cert) = X509::from_byte_string(&token.certificate_data) {
541                    UAString::from(cert.subject_name())
542                } else {
543                    UAString::from("Invalid certificate")
544                }
545            }
546            IdentityToken::Invalid(_) => UAString::from("invalid"),
547        }
548    }
549
550    pub fn is_session_terminated(&self) -> bool {
551        self.terminate_session
552    }
553
554    pub fn terminate_session(&mut self) {
555        self.terminate_session = true;
556    }
557
558    pub(crate) fn register_session(&self, address_space: Arc<RwLock<AddressSpace>>) {
559        let session_diagnostics = trace_read_lock!(self.session_diagnostics);
560        let mut address_space = trace_write_lock!(address_space);
561        session_diagnostics.register_session(self, &mut address_space);
562    }
563
564    pub(crate) fn deregister_session(&self, address_space: Arc<RwLock<AddressSpace>>) {
565        let session_diagnostics = trace_read_lock!(self.session_diagnostics);
566        let mut address_space = trace_write_lock!(address_space);
567        session_diagnostics.deregister_session(self, &mut address_space);
568    }
569}