1use std::{
5 collections::{HashMap, HashSet, VecDeque},
6 sync::{
7 atomic::{AtomicI32, Ordering},
8 Arc, RwLock,
9 },
10};
11
12use chrono::Utc;
13
14use opcua_crypto::X509;
15use opcua_types::{service_types::PublishRequest, status_code::StatusCode, *};
16
17use crate::{
18 address_space::{AddressSpace, UserAccessLevel},
19 continuation_point::BrowseContinuationPoint,
20 diagnostics::ServerDiagnostics,
21 identity_token::IdentityToken,
22 session_diagnostics::SessionDiagnostics,
23 state::ServerState,
24 subscriptions::subscription::TickReason,
25 subscriptions::subscriptions::Subscriptions,
26};
27
28#[derive(Clone)]
30pub struct SessionInfo {}
31
32const PUBLISH_REQUEST_TIMEOUT: i64 = 30000;
33
34lazy_static! {
35 static ref NEXT_SESSION_ID: AtomicI32 = AtomicI32::new(1);
36}
37
38fn next_session_id() -> NodeId {
39 let session_id = NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed);
41 let session_id = format!("Session-{}", session_id);
42 NodeId::new(1, session_id)
43}
44
45pub enum ServerUserIdentityToken {
46 Empty,
47 AnonymousIdentityToken,
48 UserNameIdentityToken(UserIdentityToken),
49 X509IdentityToken(X509IdentityToken),
50 Invalid(ExtensionObject),
51}
52
53pub struct SessionManager {
54 pub sessions: HashMap<NodeId, Arc<RwLock<Session>>>,
55 pub sessions_terminated: bool,
56}
57
58impl Default for SessionManager {
59 fn default() -> Self {
60 Self {
61 sessions: HashMap::new(),
62 sessions_terminated: false,
63 }
64 }
65}
66
67impl SessionManager {
68 pub fn len(&self) -> usize {
69 self.sessions.len()
70 }
71
72 pub fn first(&self) -> Option<Arc<RwLock<Session>>> {
73 self.sessions.iter().next().map(|(_, s)| s.clone())
74 }
75
76 pub fn sessions_terminated(&self) -> bool {
77 self.sessions_terminated
78 }
79
80 pub fn clear(&mut self) {
82 self.sessions.iter().for_each(|s| {
83 let mut session = trace_write_lock!(s.1);
84 session.set_terminated();
85 });
86 self.sessions.clear();
87 }
88
89 pub fn find_session_by_id(&self, session_id: &NodeId) -> Option<Arc<RwLock<Session>>> {
91 self.sessions
92 .iter()
93 .find(|s| {
94 let session = trace_read_lock!(s.1);
95 session.session_id() == session_id
96 })
97 .map(|s| s.1)
98 .cloned()
99 }
100
101 pub fn find_session_by_token(
104 &self,
105 authentication_token: &NodeId,
106 ) -> Option<Arc<RwLock<Session>>> {
107 self.sessions
108 .iter()
109 .find(|s| {
110 let session = trace_read_lock!(s.1);
111 session.authentication_token() == authentication_token
112 })
113 .map(|s| s.1)
114 .cloned()
115 }
116
117 pub fn register_session(&mut self, session: Arc<RwLock<Session>>) {
119 let session_id = {
120 let session = trace_read_lock!(session);
121 session.session_id().clone()
122 };
123 self.sessions.insert(session_id, session);
124 }
125
126 pub fn deregister_session(
128 &mut self,
129 session: Arc<RwLock<Session>>,
130 ) -> Option<Arc<RwLock<Session>>> {
131 let session = trace_read_lock!(session);
132 let result = self.sessions.remove(session.session_id());
133 self.sessions_terminated = self.sessions.is_empty();
134 result
135 }
136}
137
138pub struct Session {
140 session_id: NodeId,
142 security_policy_uri: String,
144 client_certificate: Option<X509>,
146 authentication_token: NodeId,
148 session_nonce: ByteString,
150 session_name: UAString,
152 session_timeout: f64,
154 user_identity: IdentityToken,
156 locale_ids: Option<Vec<UAString>>,
158 max_request_message_size: u32,
160 max_response_message_size: u32,
162 endpoint_url: UAString,
164 max_browse_continuation_points: usize,
166 browse_continuation_points: VecDeque<BrowseContinuationPoint>,
168 diagnostics: Arc<RwLock<ServerDiagnostics>>,
170 session_diagnostics: Arc<RwLock<SessionDiagnostics>>,
172 activated: bool,
174 terminate_session: bool,
176 terminated_at: DateTimeUtc,
178 terminated: bool,
180 can_modify_address_space: bool,
183 last_service_request_timestamp: DateTimeUtc,
185 subscriptions: Subscriptions,
187}
188
189impl Drop for Session {
190 fn drop(&mut self) {
191 info!("Session is being dropped");
192 let mut diagnostics = trace_write_lock!(self.diagnostics);
193 diagnostics.on_destroy_session(self);
194 }
195}
196
197impl Session {
198 #[cfg(test)]
199 pub fn new_no_certificate_store() -> Session {
200 let max_browse_continuation_points = super::constants::MAX_BROWSE_CONTINUATION_POINTS;
201 let session = Session {
202 subscriptions: Subscriptions::new(100, PUBLISH_REQUEST_TIMEOUT),
203 session_id: next_session_id(),
204 activated: false,
205 terminate_session: false,
206 terminated: false,
207 terminated_at: chrono::Utc::now(),
208 client_certificate: None,
209 security_policy_uri: String::new(),
210 authentication_token: NodeId::null(),
211 session_nonce: ByteString::null(),
212 session_name: UAString::null(),
213 session_timeout: 0f64,
214 user_identity: IdentityToken::None,
215 locale_ids: None,
216 max_request_message_size: 0,
217 max_response_message_size: 0,
218 endpoint_url: UAString::null(),
219 max_browse_continuation_points,
220 browse_continuation_points: VecDeque::with_capacity(max_browse_continuation_points),
221 can_modify_address_space: true,
222 diagnostics: Arc::new(RwLock::new(ServerDiagnostics::default())),
223 session_diagnostics: Arc::new(RwLock::new(SessionDiagnostics::default())),
224 last_service_request_timestamp: Utc::now(),
225 };
226
227 {
228 let mut diagnostics = trace_write_lock!(session.diagnostics);
229 diagnostics.on_create_session(&session);
230 }
231 session
232 }
233
234 pub fn new(server_state: Arc<RwLock<ServerState>>) -> Session {
236 let max_browse_continuation_points = super::constants::MAX_BROWSE_CONTINUATION_POINTS;
237
238 let server_state = trace_read_lock!(server_state);
239 let max_subscriptions = server_state.max_subscriptions;
240 let diagnostics = server_state.diagnostics.clone();
241 let can_modify_address_space = {
242 let config = trace_read_lock!(server_state.config);
243 config.limits.clients_can_modify_address_space
244 };
245
246 let session = Session {
247 subscriptions: Subscriptions::new(max_subscriptions, PUBLISH_REQUEST_TIMEOUT),
248 session_id: next_session_id(),
249 activated: false,
250 terminate_session: false,
251 terminated: false,
252 terminated_at: chrono::Utc::now(),
253 client_certificate: None,
254 security_policy_uri: String::new(),
255 authentication_token: NodeId::null(),
256 session_nonce: ByteString::null(),
257 session_name: UAString::null(),
258 session_timeout: 0f64,
259 user_identity: IdentityToken::None,
260 locale_ids: None,
261 max_request_message_size: 0,
262 max_response_message_size: 0,
263 endpoint_url: UAString::null(),
264 max_browse_continuation_points,
265 browse_continuation_points: VecDeque::with_capacity(max_browse_continuation_points),
266 can_modify_address_space,
267 diagnostics,
268 session_diagnostics: Arc::new(RwLock::new(SessionDiagnostics::default())),
269 last_service_request_timestamp: Utc::now(),
270 };
271 {
272 let mut diagnostics = trace_write_lock!(session.diagnostics);
273 diagnostics.on_create_session(&session);
274 }
275 session
276 }
277
278 pub fn session_id(&self) -> &NodeId {
279 &self.session_id
280 }
281
282 pub fn set_activated(&mut self, activated: bool) {
283 self.activated = activated;
284 }
285
286 pub fn is_activated(&self) -> bool {
287 self.activated
288 }
289
290 pub fn is_terminated(&self) -> bool {
291 self.terminated
292 }
293
294 pub fn terminated_at(&self) -> DateTimeUtc {
295 self.terminated_at
296 }
297
298 pub fn set_terminated(&mut self) {
299 info!("Session being set to terminated");
300 self.terminated = true;
301 self.terminated_at = chrono::Utc::now();
302 }
303
304 pub fn authentication_token(&self) -> &NodeId {
305 &self.authentication_token
306 }
307
308 pub fn set_authentication_token(&mut self, authentication_token: NodeId) {
309 self.authentication_token = authentication_token;
310 }
311
312 pub fn session_timeout(&self) -> f64 {
313 self.session_timeout
314 }
315
316 pub fn set_session_timeout(&mut self, session_timeout: f64) {
317 self.session_timeout = session_timeout;
318 }
319
320 pub fn set_max_request_message_size(&mut self, max_request_message_size: u32) {
321 self.max_request_message_size = max_request_message_size;
322 }
323
324 pub fn set_max_response_message_size(&mut self, max_response_message_size: u32) {
325 self.max_response_message_size = max_response_message_size;
326 }
327
328 pub fn endpoint_url(&self) -> &UAString {
329 &self.endpoint_url
330 }
331
332 pub fn set_endpoint_url(&mut self, endpoint_url: UAString) {
333 self.endpoint_url = endpoint_url;
334 }
335
336 pub fn set_security_policy_uri(&mut self, security_policy_uri: &str) {
337 self.security_policy_uri = security_policy_uri.to_string();
338 }
339
340 pub fn set_user_identity(&mut self, user_identity: IdentityToken) {
341 self.user_identity = user_identity;
342 }
343
344 pub fn last_service_request_timestamp(&self) -> DateTimeUtc {
345 self.last_service_request_timestamp
346 }
347
348 pub fn set_last_service_request_timestamp(
349 &mut self,
350 last_service_request_timestamp: DateTimeUtc,
351 ) {
352 self.last_service_request_timestamp = last_service_request_timestamp;
353 }
354
355 pub fn locale_ids(&self) -> &Option<Vec<UAString>> {
356 &self.locale_ids
357 }
358
359 pub fn set_locale_ids(&mut self, locale_ids: Option<Vec<UAString>>) {
360 self.locale_ids = locale_ids;
361 }
362
363 pub fn client_certificate(&self) -> &Option<X509> {
364 &self.client_certificate
365 }
366
367 pub fn set_client_certificate(&mut self, client_certificate: Option<X509>) {
368 self.client_certificate = client_certificate;
369 }
370
371 pub fn session_nonce(&self) -> &ByteString {
372 &self.session_nonce
373 }
374
375 pub fn set_session_nonce(&mut self, session_nonce: ByteString) {
376 self.session_nonce = session_nonce;
377 }
378
379 pub fn session_name(&self) -> &UAString {
380 &self.session_name
381 }
382
383 pub fn set_session_name(&mut self, session_name: UAString) {
384 self.session_name = session_name;
385 }
386
387 pub(crate) fn session_diagnostics(&self) -> Arc<RwLock<SessionDiagnostics>> {
388 self.session_diagnostics.clone()
389 }
390
391 pub(crate) fn subscriptions(&self) -> &Subscriptions {
392 &self.subscriptions
393 }
394
395 pub(crate) fn subscriptions_mut(&mut self) -> &mut Subscriptions {
396 &mut self.subscriptions
397 }
398
399 pub(crate) fn enqueue_publish_request(
400 &mut self,
401 now: &DateTimeUtc,
402 request_id: u32,
403 request: PublishRequest,
404 address_space: &AddressSpace,
405 ) -> Result<(), StatusCode> {
406 self.subscriptions
407 .enqueue_publish_request(now, request_id, request, address_space)
408 }
409
410 pub(crate) fn tick_subscriptions(
411 &mut self,
412 now: &DateTimeUtc,
413 address_space: &AddressSpace,
414 reason: TickReason,
415 ) -> Result<(), StatusCode> {
416 self.subscriptions.tick(now, address_space, reason)
417 }
418
419 pub(crate) fn reset_subscription_lifetime_counter(&mut self, subscription_id: u32) {
422 if let Some(subscription) = self.subscriptions.get_mut(subscription_id) {
423 subscription.reset_lifetime_counter();
424 }
425 }
426
427 pub(crate) fn expire_stale_publish_requests(&mut self, now: &DateTimeUtc) {
430 self.subscriptions.expire_stale_publish_requests(now);
431 }
432
433 pub(crate) fn add_browse_continuation_point(
434 &mut self,
435 continuation_point: BrowseContinuationPoint,
436 ) {
437 while self.browse_continuation_points.len() >= self.max_browse_continuation_points {
439 let continuation_point = self.browse_continuation_points.pop_front();
440 debug!(
441 "Removing old continuation point {} to make way for new one",
442 continuation_point.unwrap().id.as_base64()
443 );
444 }
445 self.browse_continuation_points
446 .push_back(continuation_point);
447 }
448
449 pub(crate) fn find_browse_continuation_point(
451 &mut self,
452 id: &ByteString,
453 ) -> Option<BrowseContinuationPoint> {
454 if let Some(idx) = self
455 .browse_continuation_points
456 .iter()
457 .position(|continuation_point| continuation_point.id == *id)
458 {
459 self.browse_continuation_points.remove(idx)
460 } else {
461 None
462 }
463 }
464
465 pub(crate) fn remove_expired_browse_continuation_points(
466 &mut self,
467 address_space: &AddressSpace,
468 ) {
469 self.browse_continuation_points.retain(|continuation_point| {
470 let valid = continuation_point.is_valid_browse_continuation_point(address_space);
471 if !valid {
472 debug!("Continuation point {:?} is no longer valid and will be removed, address space last modified = {}", continuation_point, address_space.last_modified());
473 }
474 valid
475 });
476 }
477
478 pub(crate) fn remove_browse_continuation_points(&mut self, continuation_points: &[ByteString]) {
480 let continuation_points_set: HashSet<ByteString> =
482 continuation_points.iter().cloned().collect();
483 self.browse_continuation_points
485 .retain(|continuation_point| !continuation_points_set.contains(&continuation_point.id));
486 }
487
488 pub(crate) fn can_modify_address_space(&self) -> bool {
489 self.can_modify_address_space
490 }
491
492 #[cfg(test)]
493 pub(crate) fn set_can_modify_address_space(&mut self, can_modify_address_space: bool) {
494 self.can_modify_address_space = can_modify_address_space;
495 }
496
497 pub(crate) fn effective_user_access_level(
498 &self,
499 user_access_level: UserAccessLevel,
500 _node_id: &NodeId,
501 _attribute_id: AttributeId,
502 ) -> UserAccessLevel {
503 user_access_level
505 }
506
507 pub fn client_user_id(&self) -> UAString {
511 match self.user_identity {
512 IdentityToken::None | IdentityToken::AnonymousIdentityToken(_) => UAString::null(),
513 IdentityToken::UserNameIdentityToken(ref token) => token.user_name.clone(),
514 IdentityToken::X509IdentityToken(ref token) => {
515 if let Ok(cert) = X509::from_byte_string(&token.certificate_data) {
516 UAString::from(cert.subject_name())
517 } else {
518 UAString::from("Invalid certificate")
519 }
520 }
521 IdentityToken::Invalid(_) => UAString::from("invalid"),
522 }
523 }
524
525 pub fn is_session_terminated(&self) -> bool {
526 self.terminate_session
527 }
528
529 pub fn terminate_session(&mut self) {
530 self.terminate_session = true;
531 }
532
533 pub(crate) fn register_session(&self, address_space: Arc<RwLock<AddressSpace>>) {
534 let session_diagnostics = trace_read_lock!(self.session_diagnostics);
535 let mut address_space = trace_write_lock!(address_space);
536 session_diagnostics.register_session(self, &mut address_space);
537 }
538
539 pub(crate) fn deregister_session(&self, address_space: Arc<RwLock<AddressSpace>>) {
540 let session_diagnostics = trace_read_lock!(self.session_diagnostics);
541 let mut address_space = trace_write_lock!(address_space);
542 session_diagnostics.deregister_session(self, &mut address_space);
543 }
544}