1use std::{
5 collections::{HashMap, HashSet, VecDeque},
6 sync::{
7 atomic::{AtomicI32, Ordering},
8 Arc,
9 },
10};
11
12use chrono::Utc;
13
14use crate::crypto::X509;
15use crate::sync::*;
16use crate::types::{service_types::PublishRequest, status_code::StatusCode, *};
17
18use crate::server::{
19 address_space::{AddressSpace, UserAccessLevel},
20 continuation_point::BrowseContinuationPoint,
21 diagnostics::ServerDiagnostics,
22 identity_token::IdentityToken,
23 session_diagnostics::SessionDiagnostics,
24 state::ServerState,
25 subscriptions::subscription::TickReason,
26 subscriptions::subscriptions::Subscriptions,
27};
28
29#[derive(Clone)]
31pub struct SessionInfo {}
32
33const PUBLISH_REQUEST_TIMEOUT: i64 = 30000;
34
35lazy_static! {
36 static ref NEXT_SESSION_ID: AtomicI32 = AtomicI32::new(1);
37}
38
39fn next_session_id() -> NodeId {
40 let session_id = NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed);
42 let session_id = format!("Session-{}", session_id);
43 NodeId::new(1, session_id)
44}
45
46pub enum ServerUserIdentityToken {
47 Empty,
48 AnonymousIdentityToken,
49 UserNameIdentityToken(UserIdentityToken),
50 X509IdentityToken(X509IdentityToken),
51 Invalid(ExtensionObject),
52}
53
54pub struct SessionManager {
55 pub sessions: HashMap<NodeId, Arc<RwLock<Session>>>,
56 pub sessions_terminated: bool,
57}
58
59impl Default for SessionManager {
60 fn default() -> Self {
61 Self {
62 sessions: HashMap::new(),
63 sessions_terminated: false,
64 }
65 }
66}
67
68impl SessionManager {
69 pub fn len(&self) -> usize {
70 self.sessions.len()
71 }
72
73 pub fn first(&self) -> Option<Arc<RwLock<Session>>> {
74 self.sessions.iter().next().map(|(_, s)| s.clone())
75 }
76
77 pub fn sessions_terminated(&self) -> bool {
78 self.sessions_terminated
79 }
80
81 pub fn clear(&mut self, address_space: Arc<RwLock<AddressSpace>>) {
83 for (_nodeid, session) in self.sessions.drain() {
84 let mut session = trace_write_lock!(session);
85 session.set_terminated();
86 let mut space = trace_write_lock!(address_space);
87 let diagnostics = trace_write_lock!(session.session_diagnostics);
88 diagnostics.deregister_session(&session, &mut space);
89 }
90 }
91
92 pub fn find_session_by_id(&self, session_id: &NodeId) -> Option<Arc<RwLock<Session>>> {
94 self.sessions
95 .iter()
96 .find(|s| {
97 let session = trace_read_lock!(s.1);
98 session.session_id() == session_id
99 })
100 .map(|s| s.1)
101 .cloned()
102 }
103
104 pub fn find_session_by_token(
107 &self,
108 authentication_token: &NodeId,
109 ) -> Option<Arc<RwLock<Session>>> {
110 self.sessions
111 .iter()
112 .find(|s| {
113 let session = trace_read_lock!(s.1);
114 session.authentication_token() == authentication_token
115 })
116 .map(|s| s.1)
117 .cloned()
118 }
119
120 pub fn register_session(&mut self, session: Arc<RwLock<Session>>) {
122 let session_id = {
123 let session = trace_read_lock!(session);
124 session.session_id().clone()
125 };
126 self.sessions.insert(session_id, session);
127 }
128
129 pub fn deregister_session(
131 &mut self,
132 session: Arc<RwLock<Session>>,
133 ) -> Option<Arc<RwLock<Session>>> {
134 let session = trace_read_lock!(session);
135 let session_id = session.session_id();
136 debug!(
137 "deregister_session with session id {}, auth token {}",
138 session_id,
139 session.authentication_token()
140 );
141 let result = self.sessions.remove(session_id);
142 debug!(
143 "deregister_session, new session count = {}",
144 self.sessions.len()
145 );
146 self.sessions_terminated = self.sessions.is_empty();
147 result
148 }
149}
150
151pub struct Session {
153 session_id: NodeId,
155 security_policy_uri: String,
157 secure_channel_id: u32,
159 client_certificate: Option<X509>,
161 authentication_token: NodeId,
163 session_nonce: ByteString,
165 session_name: UAString,
167 session_timeout: f64,
169 user_identity: IdentityToken,
171 locale_ids: Option<Vec<UAString>>,
173 max_request_message_size: u32,
175 max_response_message_size: u32,
177 endpoint_url: UAString,
179 max_browse_continuation_points: usize,
181 browse_continuation_points: VecDeque<BrowseContinuationPoint>,
183 diagnostics: Arc<RwLock<ServerDiagnostics>>,
185 session_diagnostics: Arc<RwLock<SessionDiagnostics>>,
187 activated: bool,
189 terminate_session: bool,
191 terminated_at: DateTimeUtc,
193 terminated: bool,
195 can_modify_address_space: bool,
198 last_service_request_timestamp: DateTimeUtc,
200 subscriptions: Subscriptions,
202}
203
204impl Drop for Session {
205 fn drop(&mut self) {
206 info!("Session is being dropped");
207 let mut diagnostics = trace_write_lock!(self.diagnostics);
208 diagnostics.on_destroy_session(self);
209 }
210}
211
212impl Session {
213 #[cfg(test)]
214 pub fn new_no_certificate_store() -> Session {
215 let max_browse_continuation_points = super::constants::MAX_BROWSE_CONTINUATION_POINTS;
216 let session = Session {
217 subscriptions: Subscriptions::new(100, PUBLISH_REQUEST_TIMEOUT),
218 session_id: next_session_id(),
219 secure_channel_id: 0,
220 activated: false,
221 terminate_session: false,
222 terminated: false,
223 terminated_at: chrono::Utc::now(),
224 client_certificate: None,
225 security_policy_uri: String::new(),
226 authentication_token: NodeId::null(),
227 session_nonce: ByteString::null(),
228 session_name: UAString::null(),
229 session_timeout: 0f64,
230 user_identity: IdentityToken::None,
231 locale_ids: None,
232 max_request_message_size: 0,
233 max_response_message_size: 0,
234 endpoint_url: UAString::null(),
235 max_browse_continuation_points,
236 browse_continuation_points: VecDeque::with_capacity(max_browse_continuation_points),
237 can_modify_address_space: true,
238 diagnostics: Arc::new(RwLock::new(ServerDiagnostics::default())),
239 session_diagnostics: Arc::new(RwLock::new(SessionDiagnostics::default())),
240 last_service_request_timestamp: Utc::now(),
241 };
242
243 {
244 let mut diagnostics = trace_write_lock!(session.diagnostics);
245 diagnostics.on_create_session(&session);
246 }
247 session
248 }
249
250 pub fn new(server_state: Arc<RwLock<ServerState>>) -> Session {
252 let max_browse_continuation_points = super::constants::MAX_BROWSE_CONTINUATION_POINTS;
253
254 let server_state = trace_read_lock!(server_state);
255 let max_subscriptions = server_state.max_subscriptions;
256 let diagnostics = server_state.diagnostics.clone();
257 let can_modify_address_space = {
258 let config = trace_read_lock!(server_state.config);
259 config.limits.clients_can_modify_address_space
260 };
261
262 let session = Session {
263 subscriptions: Subscriptions::new(max_subscriptions, PUBLISH_REQUEST_TIMEOUT),
264 session_id: next_session_id(),
265 secure_channel_id: 0,
266 activated: false,
267 terminate_session: false,
268 terminated: false,
269 terminated_at: chrono::Utc::now(),
270 client_certificate: None,
271 security_policy_uri: String::new(),
272 authentication_token: NodeId::null(),
273 session_nonce: ByteString::null(),
274 session_name: UAString::null(),
275 session_timeout: 0f64,
276 user_identity: IdentityToken::None,
277 locale_ids: None,
278 max_request_message_size: 0,
279 max_response_message_size: 0,
280 endpoint_url: UAString::null(),
281 max_browse_continuation_points,
282 browse_continuation_points: VecDeque::with_capacity(max_browse_continuation_points),
283 can_modify_address_space,
284 diagnostics,
285 session_diagnostics: Arc::new(RwLock::new(SessionDiagnostics::default())),
286 last_service_request_timestamp: Utc::now(),
287 };
288 {
289 let mut diagnostics = trace_write_lock!(session.diagnostics);
290 diagnostics.on_create_session(&session);
291 }
292 session
293 }
294
295 pub fn session_id(&self) -> &NodeId {
296 &self.session_id
297 }
298
299 pub fn set_activated(&mut self, activated: bool) {
300 self.activated = activated;
301 }
302
303 pub fn is_activated(&self) -> bool {
304 self.activated
305 }
306
307 pub fn is_terminated(&self) -> bool {
308 self.terminated
309 }
310
311 pub fn terminated_at(&self) -> DateTimeUtc {
312 self.terminated_at
313 }
314
315 pub fn set_terminated(&mut self) {
316 info!("Session being set to terminated");
317 self.terminated = true;
318 self.terminated_at = chrono::Utc::now();
319 }
320
321 pub fn secure_channel_id(&self) -> u32 {
322 self.secure_channel_id
323 }
324
325 pub fn set_secure_channel_id(&mut self, secure_channel_id: u32) {
326 self.secure_channel_id = secure_channel_id;
327 }
328
329 pub fn authentication_token(&self) -> &NodeId {
330 &self.authentication_token
331 }
332
333 pub fn set_authentication_token(&mut self, authentication_token: NodeId) {
334 self.authentication_token = authentication_token;
335 }
336
337 pub fn session_timeout(&self) -> f64 {
338 self.session_timeout
339 }
340
341 pub fn set_session_timeout(&mut self, session_timeout: f64) {
342 self.session_timeout = session_timeout;
343 }
344
345 pub fn set_max_request_message_size(&mut self, max_request_message_size: u32) {
346 self.max_request_message_size = max_request_message_size;
347 }
348
349 pub fn set_max_response_message_size(&mut self, max_response_message_size: u32) {
350 self.max_response_message_size = max_response_message_size;
351 }
352
353 pub fn endpoint_url(&self) -> &UAString {
354 &self.endpoint_url
355 }
356
357 pub fn set_endpoint_url(&mut self, endpoint_url: UAString) {
358 self.endpoint_url = endpoint_url;
359 }
360
361 pub fn set_security_policy_uri(&mut self, security_policy_uri: &str) {
362 self.security_policy_uri = security_policy_uri.to_string();
363 }
364
365 pub fn set_user_identity(&mut self, user_identity: IdentityToken) {
366 self.user_identity = user_identity;
367 }
368
369 pub fn last_service_request_timestamp(&self) -> DateTimeUtc {
370 self.last_service_request_timestamp
371 }
372
373 pub fn set_last_service_request_timestamp(
374 &mut self,
375 last_service_request_timestamp: DateTimeUtc,
376 ) {
377 self.last_service_request_timestamp = last_service_request_timestamp;
378 }
379
380 pub fn locale_ids(&self) -> &Option<Vec<UAString>> {
381 &self.locale_ids
382 }
383
384 pub fn set_locale_ids(&mut self, locale_ids: Option<Vec<UAString>>) {
385 self.locale_ids = locale_ids;
386 }
387
388 pub fn client_certificate(&self) -> &Option<X509> {
389 &self.client_certificate
390 }
391
392 pub fn set_client_certificate(&mut self, client_certificate: Option<X509>) {
393 self.client_certificate = client_certificate;
394 }
395
396 pub fn session_nonce(&self) -> &ByteString {
397 &self.session_nonce
398 }
399
400 pub fn set_session_nonce(&mut self, session_nonce: ByteString) {
401 self.session_nonce = session_nonce;
402 }
403
404 pub fn session_name(&self) -> &UAString {
405 &self.session_name
406 }
407
408 pub fn set_session_name(&mut self, session_name: UAString) {
409 self.session_name = session_name;
410 }
411
412 pub(crate) fn session_diagnostics(&self) -> Arc<RwLock<SessionDiagnostics>> {
413 self.session_diagnostics.clone()
414 }
415
416 pub(crate) fn subscriptions(&self) -> &Subscriptions {
417 &self.subscriptions
418 }
419
420 pub(crate) fn subscriptions_mut(&mut self) -> &mut Subscriptions {
421 &mut self.subscriptions
422 }
423
424 pub(crate) fn enqueue_publish_request(
425 &mut self,
426 now: &DateTimeUtc,
427 request_id: u32,
428 request: PublishRequest,
429 address_space: &AddressSpace,
430 ) -> Result<(), StatusCode> {
431 self.subscriptions
432 .enqueue_publish_request(now, request_id, request, address_space)
433 }
434
435 pub(crate) fn tick_subscriptions(
436 &mut self,
437 now: &DateTimeUtc,
438 address_space: &AddressSpace,
439 reason: TickReason,
440 ) -> Result<(), StatusCode> {
441 self.subscriptions.tick(now, address_space, reason)
442 }
443
444 pub(crate) fn reset_subscription_lifetime_counter(&mut self, subscription_id: u32) {
447 if let Some(subscription) = self.subscriptions.get_mut(subscription_id) {
448 subscription.reset_lifetime_counter();
449 }
450 }
451
452 pub(crate) fn expire_stale_publish_requests(&mut self, now: &DateTimeUtc) {
455 self.subscriptions.expire_stale_publish_requests(now);
456 }
457
458 pub(crate) fn add_browse_continuation_point(
459 &mut self,
460 continuation_point: BrowseContinuationPoint,
461 ) {
462 while self.browse_continuation_points.len() >= self.max_browse_continuation_points {
464 let continuation_point = self.browse_continuation_points.pop_front();
465 debug!(
466 "Removing old continuation point {} to make way for new one",
467 continuation_point.unwrap().id.as_base64()
468 );
469 }
470 self.browse_continuation_points
471 .push_back(continuation_point);
472 }
473
474 pub(crate) fn find_browse_continuation_point(
476 &mut self,
477 id: &ByteString,
478 ) -> Option<BrowseContinuationPoint> {
479 if let Some(idx) = self
480 .browse_continuation_points
481 .iter()
482 .position(|continuation_point| continuation_point.id == *id)
483 {
484 self.browse_continuation_points.remove(idx)
485 } else {
486 None
487 }
488 }
489
490 pub(crate) fn remove_expired_browse_continuation_points(
491 &mut self,
492 address_space: &AddressSpace,
493 ) {
494 self.browse_continuation_points.retain(|continuation_point| {
495 let valid = continuation_point.is_valid_browse_continuation_point(address_space);
496 if !valid {
497 debug!("Continuation point {:?} is no longer valid and will be removed, address space last modified = {}", continuation_point, address_space.last_modified());
498 }
499 valid
500 });
501 }
502
503 pub(crate) fn remove_browse_continuation_points(&mut self, continuation_points: &[ByteString]) {
505 let continuation_points_set: HashSet<ByteString> =
507 continuation_points.iter().cloned().collect();
508 self.browse_continuation_points
510 .retain(|continuation_point| !continuation_points_set.contains(&continuation_point.id));
511 }
512
513 pub(crate) fn can_modify_address_space(&self) -> bool {
514 self.can_modify_address_space
515 }
516
517 #[cfg(test)]
518 pub(crate) fn set_can_modify_address_space(&mut self, can_modify_address_space: bool) {
519 self.can_modify_address_space = can_modify_address_space;
520 }
521
522 pub(crate) fn effective_user_access_level(
523 &self,
524 user_access_level: UserAccessLevel,
525 _node_id: &NodeId,
526 _attribute_id: AttributeId,
527 ) -> UserAccessLevel {
528 user_access_level
530 }
531
532 pub fn client_user_id(&self) -> UAString {
536 match self.user_identity {
537 IdentityToken::None | IdentityToken::AnonymousIdentityToken(_) => UAString::null(),
538 IdentityToken::UserNameIdentityToken(ref token) => token.user_name.clone(),
539 IdentityToken::X509IdentityToken(ref token) => {
540 if let Ok(cert) = X509::from_byte_string(&token.certificate_data) {
541 UAString::from(cert.subject_name())
542 } else {
543 UAString::from("Invalid certificate")
544 }
545 }
546 IdentityToken::Invalid(_) => UAString::from("invalid"),
547 }
548 }
549
550 pub fn is_session_terminated(&self) -> bool {
551 self.terminate_session
552 }
553
554 pub fn terminate_session(&mut self) {
555 self.terminate_session = true;
556 }
557
558 pub(crate) fn register_session(&self, address_space: Arc<RwLock<AddressSpace>>) {
559 let session_diagnostics = trace_read_lock!(self.session_diagnostics);
560 let mut address_space = trace_write_lock!(address_space);
561 session_diagnostics.register_session(self, &mut address_space);
562 }
563
564 pub(crate) fn deregister_session(&self, address_space: Arc<RwLock<AddressSpace>>) {
565 let session_diagnostics = trace_read_lock!(self.session_diagnostics);
566 let mut address_space = trace_write_lock!(address_space);
567 session_diagnostics.deregister_session(self, &mut address_space);
568 }
569}