opcua_server/
session.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4use std::{
5    collections::{HashMap, HashSet, VecDeque},
6    sync::{
7        atomic::{AtomicI32, Ordering},
8        Arc, RwLock,
9    },
10};
11
12use chrono::Utc;
13
14use opcua_crypto::X509;
15use opcua_types::{service_types::PublishRequest, status_code::StatusCode, *};
16
17use crate::{
18    address_space::{AddressSpace, UserAccessLevel},
19    continuation_point::BrowseContinuationPoint,
20    diagnostics::ServerDiagnostics,
21    identity_token::IdentityToken,
22    session_diagnostics::SessionDiagnostics,
23    state::ServerState,
24    subscriptions::subscription::TickReason,
25    subscriptions::subscriptions::Subscriptions,
26};
27
28/// Session info holds information about a session created by CreateSession service
29#[derive(Clone)]
30pub struct SessionInfo {}
31
32const PUBLISH_REQUEST_TIMEOUT: i64 = 30000;
33
34lazy_static! {
35    static ref NEXT_SESSION_ID: AtomicI32 = AtomicI32::new(1);
36}
37
38fn next_session_id() -> NodeId {
39    // Session id will be a string identifier
40    let session_id = NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed);
41    let session_id = format!("Session-{}", session_id);
42    NodeId::new(1, session_id)
43}
44
45pub enum ServerUserIdentityToken {
46    Empty,
47    AnonymousIdentityToken,
48    UserNameIdentityToken(UserIdentityToken),
49    X509IdentityToken(X509IdentityToken),
50    Invalid(ExtensionObject),
51}
52
53pub struct SessionManager {
54    pub sessions: HashMap<NodeId, Arc<RwLock<Session>>>,
55    pub sessions_terminated: bool,
56}
57
58impl Default for SessionManager {
59    fn default() -> Self {
60        Self {
61            sessions: HashMap::new(),
62            sessions_terminated: false,
63        }
64    }
65}
66
67impl SessionManager {
68    pub fn len(&self) -> usize {
69        self.sessions.len()
70    }
71
72    pub fn first(&self) -> Option<Arc<RwLock<Session>>> {
73        self.sessions.iter().next().map(|(_, s)| s.clone())
74    }
75
76    pub fn sessions_terminated(&self) -> bool {
77        self.sessions_terminated
78    }
79
80    /// Puts all sessions into a terminated state and clears the map
81    pub fn clear(&mut self) {
82        self.sessions.iter().for_each(|s| {
83            let mut session = trace_write_lock!(s.1);
84            session.set_terminated();
85        });
86        self.sessions.clear();
87    }
88
89    /// Find a session by its session id and return it.
90    pub fn find_session_by_id(&self, session_id: &NodeId) -> Option<Arc<RwLock<Session>>> {
91        self.sessions
92            .iter()
93            .find(|s| {
94                let session = trace_read_lock!(s.1);
95                session.session_id() == session_id
96            })
97            .map(|s| s.1)
98            .cloned()
99    }
100
101    /// Finds the session by its authentication token and returns it. The authentication token
102    /// can be renewed so  it is not used as a key.
103    pub fn find_session_by_token(
104        &self,
105        authentication_token: &NodeId,
106    ) -> Option<Arc<RwLock<Session>>> {
107        self.sessions
108            .iter()
109            .find(|s| {
110                let session = trace_read_lock!(s.1);
111                session.authentication_token() == authentication_token
112            })
113            .map(|s| s.1)
114            .cloned()
115    }
116
117    /// Register the session in the map so it can be searched on
118    pub fn register_session(&mut self, session: Arc<RwLock<Session>>) {
119        let session_id = {
120            let session = trace_read_lock!(session);
121            session.session_id().clone()
122        };
123        self.sessions.insert(session_id, session);
124    }
125
126    /// Deregisters a session from the map
127    pub fn deregister_session(
128        &mut self,
129        session: Arc<RwLock<Session>>,
130    ) -> Option<Arc<RwLock<Session>>> {
131        let session = trace_read_lock!(session);
132        let result = self.sessions.remove(session.session_id());
133        self.sessions_terminated = self.sessions.is_empty();
134        result
135    }
136}
137
138/// The Session is any state maintained between the client and server
139pub struct Session {
140    /// The session identifier
141    session_id: NodeId,
142    /// Security policy
143    security_policy_uri: String,
144    /// Client's certificate
145    client_certificate: Option<X509>,
146    /// Authentication token for the session
147    authentication_token: NodeId,
148    /// Session nonce
149    session_nonce: ByteString,
150    /// Session name (supplied by client)
151    session_name: UAString,
152    /// Session timeout
153    session_timeout: f64,
154    /// User identity token
155    user_identity: IdentityToken,
156    /// Session's preferred locale ids
157    locale_ids: Option<Vec<UAString>>,
158    /// Negotiated max request message size
159    max_request_message_size: u32,
160    /// Negotiated max response message size
161    max_response_message_size: u32,
162    /// Endpoint url for this session
163    endpoint_url: UAString,
164    /// Maximum number of continuation points
165    max_browse_continuation_points: usize,
166    /// Browse continuation points (oldest to newest)
167    browse_continuation_points: VecDeque<BrowseContinuationPoint>,
168    /// Diagnostics associated with the server
169    diagnostics: Arc<RwLock<ServerDiagnostics>>,
170    /// Diagnostics associated with the session
171    session_diagnostics: Arc<RwLock<SessionDiagnostics>>,
172    /// Indicates if the session has received an ActivateSession
173    activated: bool,
174    /// Flag to indicate session should be terminated
175    terminate_session: bool,
176    /// Time that session was terminated, helps with recovering sessions, or clearing them out
177    terminated_at: DateTimeUtc,
178    /// Flag indicating session is actually terminated
179    terminated: bool,
180    /// Flag indicating broadly if this session may modify the address space by adding or removing
181    /// nodes or references to nodes.
182    can_modify_address_space: bool,
183    /// Timestamp of the last service request to have happened (only counts service requests while there is a session)
184    last_service_request_timestamp: DateTimeUtc,
185    /// Subscriptions associated with the session
186    subscriptions: Subscriptions,
187}
188
189impl Drop for Session {
190    fn drop(&mut self) {
191        info!("Session is being dropped");
192        let mut diagnostics = trace_write_lock!(self.diagnostics);
193        diagnostics.on_destroy_session(self);
194    }
195}
196
197impl Session {
198    #[cfg(test)]
199    pub fn new_no_certificate_store() -> Session {
200        let max_browse_continuation_points = super::constants::MAX_BROWSE_CONTINUATION_POINTS;
201        let session = Session {
202            subscriptions: Subscriptions::new(100, PUBLISH_REQUEST_TIMEOUT),
203            session_id: next_session_id(),
204            activated: false,
205            terminate_session: false,
206            terminated: false,
207            terminated_at: chrono::Utc::now(),
208            client_certificate: None,
209            security_policy_uri: String::new(),
210            authentication_token: NodeId::null(),
211            session_nonce: ByteString::null(),
212            session_name: UAString::null(),
213            session_timeout: 0f64,
214            user_identity: IdentityToken::None,
215            locale_ids: None,
216            max_request_message_size: 0,
217            max_response_message_size: 0,
218            endpoint_url: UAString::null(),
219            max_browse_continuation_points,
220            browse_continuation_points: VecDeque::with_capacity(max_browse_continuation_points),
221            can_modify_address_space: true,
222            diagnostics: Arc::new(RwLock::new(ServerDiagnostics::default())),
223            session_diagnostics: Arc::new(RwLock::new(SessionDiagnostics::default())),
224            last_service_request_timestamp: Utc::now(),
225        };
226
227        {
228            let mut diagnostics = trace_write_lock!(session.diagnostics);
229            diagnostics.on_create_session(&session);
230        }
231        session
232    }
233
234    /// Create a `Session` from a `Server`
235    pub fn new(server_state: Arc<RwLock<ServerState>>) -> Session {
236        let max_browse_continuation_points = super::constants::MAX_BROWSE_CONTINUATION_POINTS;
237
238        let server_state = trace_read_lock!(server_state);
239        let max_subscriptions = server_state.max_subscriptions;
240        let diagnostics = server_state.diagnostics.clone();
241        let can_modify_address_space = {
242            let config = trace_read_lock!(server_state.config);
243            config.limits.clients_can_modify_address_space
244        };
245
246        let session = Session {
247            subscriptions: Subscriptions::new(max_subscriptions, PUBLISH_REQUEST_TIMEOUT),
248            session_id: next_session_id(),
249            activated: false,
250            terminate_session: false,
251            terminated: false,
252            terminated_at: chrono::Utc::now(),
253            client_certificate: None,
254            security_policy_uri: String::new(),
255            authentication_token: NodeId::null(),
256            session_nonce: ByteString::null(),
257            session_name: UAString::null(),
258            session_timeout: 0f64,
259            user_identity: IdentityToken::None,
260            locale_ids: None,
261            max_request_message_size: 0,
262            max_response_message_size: 0,
263            endpoint_url: UAString::null(),
264            max_browse_continuation_points,
265            browse_continuation_points: VecDeque::with_capacity(max_browse_continuation_points),
266            can_modify_address_space,
267            diagnostics,
268            session_diagnostics: Arc::new(RwLock::new(SessionDiagnostics::default())),
269            last_service_request_timestamp: Utc::now(),
270        };
271        {
272            let mut diagnostics = trace_write_lock!(session.diagnostics);
273            diagnostics.on_create_session(&session);
274        }
275        session
276    }
277
278    pub fn session_id(&self) -> &NodeId {
279        &self.session_id
280    }
281
282    pub fn set_activated(&mut self, activated: bool) {
283        self.activated = activated;
284    }
285
286    pub fn is_activated(&self) -> bool {
287        self.activated
288    }
289
290    pub fn is_terminated(&self) -> bool {
291        self.terminated
292    }
293
294    pub fn terminated_at(&self) -> DateTimeUtc {
295        self.terminated_at
296    }
297
298    pub fn set_terminated(&mut self) {
299        info!("Session being set to terminated");
300        self.terminated = true;
301        self.terminated_at = chrono::Utc::now();
302    }
303
304    pub fn authentication_token(&self) -> &NodeId {
305        &self.authentication_token
306    }
307
308    pub fn set_authentication_token(&mut self, authentication_token: NodeId) {
309        self.authentication_token = authentication_token;
310    }
311
312    pub fn session_timeout(&self) -> f64 {
313        self.session_timeout
314    }
315
316    pub fn set_session_timeout(&mut self, session_timeout: f64) {
317        self.session_timeout = session_timeout;
318    }
319
320    pub fn set_max_request_message_size(&mut self, max_request_message_size: u32) {
321        self.max_request_message_size = max_request_message_size;
322    }
323
324    pub fn set_max_response_message_size(&mut self, max_response_message_size: u32) {
325        self.max_response_message_size = max_response_message_size;
326    }
327
328    pub fn endpoint_url(&self) -> &UAString {
329        &self.endpoint_url
330    }
331
332    pub fn set_endpoint_url(&mut self, endpoint_url: UAString) {
333        self.endpoint_url = endpoint_url;
334    }
335
336    pub fn set_security_policy_uri(&mut self, security_policy_uri: &str) {
337        self.security_policy_uri = security_policy_uri.to_string();
338    }
339
340    pub fn set_user_identity(&mut self, user_identity: IdentityToken) {
341        self.user_identity = user_identity;
342    }
343
344    pub fn last_service_request_timestamp(&self) -> DateTimeUtc {
345        self.last_service_request_timestamp
346    }
347
348    pub fn set_last_service_request_timestamp(
349        &mut self,
350        last_service_request_timestamp: DateTimeUtc,
351    ) {
352        self.last_service_request_timestamp = last_service_request_timestamp;
353    }
354
355    pub fn locale_ids(&self) -> &Option<Vec<UAString>> {
356        &self.locale_ids
357    }
358
359    pub fn set_locale_ids(&mut self, locale_ids: Option<Vec<UAString>>) {
360        self.locale_ids = locale_ids;
361    }
362
363    pub fn client_certificate(&self) -> &Option<X509> {
364        &self.client_certificate
365    }
366
367    pub fn set_client_certificate(&mut self, client_certificate: Option<X509>) {
368        self.client_certificate = client_certificate;
369    }
370
371    pub fn session_nonce(&self) -> &ByteString {
372        &self.session_nonce
373    }
374
375    pub fn set_session_nonce(&mut self, session_nonce: ByteString) {
376        self.session_nonce = session_nonce;
377    }
378
379    pub fn session_name(&self) -> &UAString {
380        &self.session_name
381    }
382
383    pub fn set_session_name(&mut self, session_name: UAString) {
384        self.session_name = session_name;
385    }
386
387    pub(crate) fn session_diagnostics(&self) -> Arc<RwLock<SessionDiagnostics>> {
388        self.session_diagnostics.clone()
389    }
390
391    pub(crate) fn subscriptions(&self) -> &Subscriptions {
392        &self.subscriptions
393    }
394
395    pub(crate) fn subscriptions_mut(&mut self) -> &mut Subscriptions {
396        &mut self.subscriptions
397    }
398
399    pub(crate) fn enqueue_publish_request(
400        &mut self,
401        now: &DateTimeUtc,
402        request_id: u32,
403        request: PublishRequest,
404        address_space: &AddressSpace,
405    ) -> Result<(), StatusCode> {
406        self.subscriptions
407            .enqueue_publish_request(now, request_id, request, address_space)
408    }
409
410    pub(crate) fn tick_subscriptions(
411        &mut self,
412        now: &DateTimeUtc,
413        address_space: &AddressSpace,
414        reason: TickReason,
415    ) -> Result<(), StatusCode> {
416        self.subscriptions.tick(now, address_space, reason)
417    }
418
419    /// Reset the lifetime counter on the subscription, e.g. because a service references the
420    /// subscription.
421    pub(crate) fn reset_subscription_lifetime_counter(&mut self, subscription_id: u32) {
422        if let Some(subscription) = self.subscriptions.get_mut(subscription_id) {
423            subscription.reset_lifetime_counter();
424        }
425    }
426
427    /// Iterates through the existing queued publish requests and creates a timeout
428    /// publish response any that have expired.
429    pub(crate) fn expire_stale_publish_requests(&mut self, now: &DateTimeUtc) {
430        self.subscriptions.expire_stale_publish_requests(now);
431    }
432
433    pub(crate) fn add_browse_continuation_point(
434        &mut self,
435        continuation_point: BrowseContinuationPoint,
436    ) {
437        // Remove excess browse continuation points
438        while self.browse_continuation_points.len() >= self.max_browse_continuation_points {
439            let continuation_point = self.browse_continuation_points.pop_front();
440            debug!(
441                "Removing old continuation point {} to make way for new one",
442                continuation_point.unwrap().id.as_base64()
443            );
444        }
445        self.browse_continuation_points
446            .push_back(continuation_point);
447    }
448
449    /// Finds and REMOVES a continuation point by id.
450    pub(crate) fn find_browse_continuation_point(
451        &mut self,
452        id: &ByteString,
453    ) -> Option<BrowseContinuationPoint> {
454        if let Some(idx) = self
455            .browse_continuation_points
456            .iter()
457            .position(|continuation_point| continuation_point.id == *id)
458        {
459            self.browse_continuation_points.remove(idx)
460        } else {
461            None
462        }
463    }
464
465    pub(crate) fn remove_expired_browse_continuation_points(
466        &mut self,
467        address_space: &AddressSpace,
468    ) {
469        self.browse_continuation_points.retain(|continuation_point| {
470            let valid = continuation_point.is_valid_browse_continuation_point(address_space);
471            if !valid {
472                debug!("Continuation point {:?} is no longer valid and will be removed, address space last modified = {}", continuation_point, address_space.last_modified());
473            }
474            valid
475        });
476    }
477
478    /// Remove all the specified continuation points by id
479    pub(crate) fn remove_browse_continuation_points(&mut self, continuation_points: &[ByteString]) {
480        // Turn the supplied slice into a set
481        let continuation_points_set: HashSet<ByteString> =
482            continuation_points.iter().cloned().collect();
483        // Now remove any continuation points that are part of that set
484        self.browse_continuation_points
485            .retain(|continuation_point| !continuation_points_set.contains(&continuation_point.id));
486    }
487
488    pub(crate) fn can_modify_address_space(&self) -> bool {
489        self.can_modify_address_space
490    }
491
492    #[cfg(test)]
493    pub(crate) fn set_can_modify_address_space(&mut self, can_modify_address_space: bool) {
494        self.can_modify_address_space = can_modify_address_space;
495    }
496
497    pub(crate) fn effective_user_access_level(
498        &self,
499        user_access_level: UserAccessLevel,
500        _node_id: &NodeId,
501        _attribute_id: AttributeId,
502    ) -> UserAccessLevel {
503        // TODO session could modify the user_access_level further here via user / groups
504        user_access_level
505    }
506
507    /// Helper function to return the client user id from the identity token or None of there is no user id
508    ///
509    /// This conforms to OPC Part 5 6.4.3 ClientUserId
510    pub fn client_user_id(&self) -> UAString {
511        match self.user_identity {
512            IdentityToken::None | IdentityToken::AnonymousIdentityToken(_) => UAString::null(),
513            IdentityToken::UserNameIdentityToken(ref token) => token.user_name.clone(),
514            IdentityToken::X509IdentityToken(ref token) => {
515                if let Ok(cert) = X509::from_byte_string(&token.certificate_data) {
516                    UAString::from(cert.subject_name())
517                } else {
518                    UAString::from("Invalid certificate")
519                }
520            }
521            IdentityToken::Invalid(_) => UAString::from("invalid"),
522        }
523    }
524
525    pub fn is_session_terminated(&self) -> bool {
526        self.terminate_session
527    }
528
529    pub fn terminate_session(&mut self) {
530        self.terminate_session = true;
531    }
532
533    pub(crate) fn register_session(&self, address_space: Arc<RwLock<AddressSpace>>) {
534        let session_diagnostics = trace_read_lock!(self.session_diagnostics);
535        let mut address_space = trace_write_lock!(address_space);
536        session_diagnostics.register_session(self, &mut address_space);
537    }
538
539    pub(crate) fn deregister_session(&self, address_space: Arc<RwLock<AddressSpace>>) {
540        let session_diagnostics = trace_read_lock!(self.session_diagnostics);
541        let mut address_space = trace_write_lock!(address_space);
542        session_diagnostics.deregister_session(self, &mut address_space);
543    }
544}