1use std::sync::Arc;
8
9use crate::core::prelude::*;
10use crate::crypto::{user_identity, PrivateKey, SecurityPolicy, X509};
11use crate::sync::*;
12use crate::types::{
13 profiles,
14 service_types::{
15 ActivateSessionRequest, AnonymousIdentityToken, ApplicationDescription, ApplicationType,
16 EndpointDescription, RegisteredServer, ServerState as ServerStateType, SignatureData,
17 UserNameIdentityToken, UserTokenPolicy, UserTokenType, X509IdentityToken,
18 },
19 status_code::StatusCode,
20};
21
22use crate::server::{
23 callbacks::{RegisterNodes, UnregisterNodes},
24 config::{ServerConfig, ServerEndpoint},
25 constants,
26 diagnostics::ServerDiagnostics,
27 events::{
28 audit::{AuditEvent, AuditLog},
29 event::Event,
30 },
31 historical::{HistoricalDataProvider, HistoricalEventProvider},
32 identity_token::{
33 IdentityToken, POLICY_ID_ANONYMOUS, POLICY_ID_USER_PASS_NONE, POLICY_ID_USER_PASS_RSA_15,
34 POLICY_ID_USER_PASS_RSA_OAEP, POLICY_ID_X509,
35 },
36};
37
38pub(crate) struct OperationalLimits {
39 pub max_nodes_per_translate_browse_paths_to_node_ids: usize,
40 pub max_nodes_per_read: usize,
41 pub max_nodes_per_write: usize,
42 pub max_nodes_per_method_call: usize,
43 pub max_nodes_per_browse: usize,
44 pub max_nodes_per_register_nodes: usize,
45 pub max_nodes_per_node_management: usize,
46 pub max_monitored_items_per_call: usize,
47 pub max_nodes_per_history_read_data: usize,
48 pub max_nodes_per_history_read_events: usize,
49 pub max_nodes_per_history_update_data: usize,
50 pub max_nodes_per_history_update_events: usize,
51}
52
53impl Default for OperationalLimits {
54 fn default() -> Self {
55 Self {
56 max_nodes_per_translate_browse_paths_to_node_ids:
57 constants::MAX_NODES_PER_TRANSLATE_BROWSE_PATHS_TO_NODE_IDS,
58 max_nodes_per_read: constants::MAX_NODES_PER_READ,
59 max_nodes_per_write: constants::MAX_NODES_PER_WRITE,
60 max_nodes_per_method_call: constants::MAX_NODES_PER_METHOD_CALL,
61 max_nodes_per_browse: constants::MAX_NODES_PER_BROWSE,
62 max_nodes_per_register_nodes: constants::MAX_NODES_PER_REGISTER_NODES,
63 max_nodes_per_node_management: constants::MAX_NODES_PER_NODE_MANAGEMENT,
64 max_monitored_items_per_call: constants::MAX_MONITORED_ITEMS_PER_CALL,
65 max_nodes_per_history_read_data: constants::MAX_NODES_PER_HISTORY_READ_DATA,
66 max_nodes_per_history_read_events: constants::MAX_NODES_PER_HISTORY_READ_EVENTS,
67 max_nodes_per_history_update_data: constants::MAX_NODES_PER_HISTORY_UPDATE_DATA,
68 max_nodes_per_history_update_events: constants::MAX_NODES_PER_HISTORY_UPDATE_EVENTS,
69 }
70 }
71}
72
73pub struct ServerState {
76 pub application_uri: UAString,
78 pub product_uri: UAString,
80 pub application_name: LocalizedText,
82 pub base_endpoint: String,
84 pub start_time: DateTime,
86 pub servers: Vec<String>,
88 pub config: Arc<RwLock<ServerConfig>>,
90 pub server_certificate: Option<X509>,
92 pub server_pkey: Option<PrivateKey>,
94 pub last_subscription_id: u32,
97 pub max_subscriptions: usize,
99 pub max_monitored_items_per_sub: usize,
101 pub max_monitored_item_queue_size: usize,
103 pub min_publishing_interval_ms: Duration,
105 pub min_sampling_interval_ms: Duration,
107 pub default_keep_alive_count: u32,
109 pub max_keep_alive_count: u32,
111 pub max_lifetime_count: u32,
113 pub(crate) operational_limits: OperationalLimits,
115 pub state: ServerStateType,
117 pub abort: bool,
119 pub(crate) audit_log: Arc<RwLock<AuditLog>>,
121 pub(crate) diagnostics: Arc<RwLock<ServerDiagnostics>>,
123 pub(crate) register_nodes_callback: Option<Box<dyn RegisterNodes + Send + Sync>>,
125 pub(crate) unregister_nodes_callback: Option<Box<dyn UnregisterNodes + Send + Sync>>,
127 pub(crate) historical_data_provider: Option<Box<dyn HistoricalDataProvider + Send + Sync>>,
129 pub(crate) historical_event_provider: Option<Box<dyn HistoricalEventProvider + Send + Sync>>,
131 pub send_buffer_size: usize,
133 pub receive_buffer_size: usize,
135}
136
137impl ServerState {
138 pub fn endpoints(
139 &self,
140 endpoint_url: &UAString,
141 transport_profile_uris: &Option<Vec<UAString>>,
142 ) -> Option<Vec<EndpointDescription>> {
143 debug!(
145 "Endpoints requested, transport profile uris {:?}",
146 transport_profile_uris
147 );
148 if let Some(ref transport_profile_uris) = *transport_profile_uris {
149 if !transport_profile_uris.is_empty() {
151 let found_binary_transport = transport_profile_uris.iter().any(|profile_uri| {
153 profile_uri.as_ref() == profiles::TRANSPORT_PROFILE_URI_BINARY
154 });
155 if !found_binary_transport {
156 error!(
157 "Client wants to connect with a non binary transport {:#?}",
158 transport_profile_uris
159 );
160 return None;
161 }
162 }
163 }
164
165 let config = trace_read_lock!(self.config);
166 if let Ok(hostname) = hostname_from_url(endpoint_url.as_ref()) {
167 if !hostname.eq_ignore_ascii_case(&config.tcp_config.host) {
168 debug!("Endpoint url \"{}\" hostname supplied by caller does not match server's hostname \"{}\"", endpoint_url, &config.tcp_config.host);
169 }
170 let endpoints = config
171 .endpoints
172 .iter()
173 .map(|(_, e)| self.new_endpoint_description(&config, e, true))
174 .collect();
175 Some(endpoints)
176 } else {
177 warn!(
178 "Endpoint url \"{}\" is unrecognized, using default",
179 endpoint_url
180 );
181 if let Some(e) = config.default_endpoint() {
182 Some(vec![self.new_endpoint_description(&config, e, true)])
183 } else {
184 Some(vec![])
185 }
186 }
187 }
188
189 pub fn endpoint_exists(
190 &self,
191 endpoint_url: &str,
192 security_policy: SecurityPolicy,
193 security_mode: MessageSecurityMode,
194 ) -> bool {
195 let config = trace_read_lock!(self.config);
196 config
197 .find_endpoint(endpoint_url, security_policy, security_mode)
198 .is_some()
199 }
200
201 pub fn new_endpoint_descriptions(
205 &self,
206 endpoint_url: &str,
207 ) -> Option<Vec<EndpointDescription>> {
208 debug!("find_endpoint, url = {}", endpoint_url);
209 let config = trace_read_lock!(self.config);
210 let base_endpoint_url = config.base_endpoint_url();
211 let endpoints: Vec<EndpointDescription> = config
212 .endpoints
213 .iter()
214 .filter(|&(_, e)| {
215 url_matches_except_host(&e.endpoint_url(&base_endpoint_url), endpoint_url)
217 })
218 .map(|(_, e)| self.new_endpoint_description(&config, e, false))
219 .collect();
220 if endpoints.is_empty() {
221 None
222 } else {
223 Some(endpoints)
224 }
225 }
226
227 fn user_pass_security_policy_id(endpoint: &ServerEndpoint) -> UAString {
229 match endpoint.password_security_policy() {
230 SecurityPolicy::None => POLICY_ID_USER_PASS_NONE,
231 SecurityPolicy::Basic128Rsa15 => POLICY_ID_USER_PASS_RSA_15,
232 SecurityPolicy::Basic256 | SecurityPolicy::Basic256Sha256 => {
233 POLICY_ID_USER_PASS_RSA_OAEP
234 }
235 SecurityPolicy::Aes128Sha256RsaOaep | SecurityPolicy::Aes256Sha256RsaPss => {
237 POLICY_ID_USER_PASS_RSA_OAEP
238 }
239 _ => {
240 panic!()
241 }
242 }
243 .into()
244 }
245
246 fn user_pass_security_policy_uri(_endpoint: &ServerEndpoint) -> UAString {
247 UAString::null()
250 }
251
252 fn user_identity_tokens(
253 &self,
254 config: &ServerConfig,
255 endpoint: &ServerEndpoint,
256 ) -> Vec<UserTokenPolicy> {
257 let mut user_identity_tokens = Vec::with_capacity(3);
258
259 if endpoint.supports_anonymous() {
261 user_identity_tokens.push(UserTokenPolicy {
262 policy_id: UAString::from(POLICY_ID_ANONYMOUS),
263 token_type: UserTokenType::Anonymous,
264 issued_token_type: UAString::null(),
265 issuer_endpoint_url: UAString::null(),
266 security_policy_uri: UAString::null(),
267 });
268 }
269 if endpoint.supports_user_pass(&config.user_tokens) {
271 user_identity_tokens.push(UserTokenPolicy {
273 policy_id: Self::user_pass_security_policy_id(endpoint),
274 token_type: UserTokenType::UserName,
275 issued_token_type: UAString::null(),
276 issuer_endpoint_url: UAString::null(),
277 security_policy_uri: Self::user_pass_security_policy_uri(endpoint),
278 });
279 }
280 if endpoint.supports_x509(&config.user_tokens) {
282 user_identity_tokens.push(UserTokenPolicy {
283 policy_id: UAString::from(POLICY_ID_X509),
284 token_type: UserTokenType::Certificate,
285 issued_token_type: UAString::null(),
286 issuer_endpoint_url: UAString::null(),
287 security_policy_uri: UAString::from(SecurityPolicy::Basic128Rsa15.to_uri()),
288 });
289 }
290
291 if user_identity_tokens.is_empty() {
292 debug!(
293 "user_identity_tokens() returned zero endpoints for endpoint {} / {} {}",
294 endpoint.path, endpoint.security_policy, endpoint.security_mode
295 );
296 }
297
298 user_identity_tokens
299 }
300
301 fn new_endpoint_description(
303 &self,
304 config: &ServerConfig,
305 endpoint: &ServerEndpoint,
306 all_fields: bool,
307 ) -> EndpointDescription {
308 let base_endpoint_url = config.base_endpoint_url();
309
310 let user_identity_tokens = self.user_identity_tokens(config, endpoint);
311
312 let (server, server_certificate) = if all_fields {
316 (
317 ApplicationDescription {
318 application_uri: self.application_uri.clone(),
319 product_uri: self.product_uri.clone(),
320 application_name: self.application_name.clone(),
321 application_type: self.application_type(),
322 gateway_server_uri: self.gateway_server_uri(),
323 discovery_profile_uri: UAString::null(),
324 discovery_urls: self.discovery_urls(),
325 },
326 self.server_certificate_as_byte_string(),
327 )
328 } else {
329 (
330 ApplicationDescription {
331 application_uri: UAString::null(),
332 product_uri: UAString::null(),
333 application_name: LocalizedText::null(),
334 application_type: self.application_type(),
335 gateway_server_uri: self.gateway_server_uri(),
336 discovery_profile_uri: UAString::null(),
337 discovery_urls: self.discovery_urls(),
338 },
339 ByteString::null(),
340 )
341 };
342
343 EndpointDescription {
344 endpoint_url: endpoint.endpoint_url(&base_endpoint_url).into(),
345 server,
346 server_certificate,
347 security_mode: endpoint.message_security_mode(),
348 security_policy_uri: UAString::from(endpoint.security_policy().to_uri()),
349 user_identity_tokens: Some(user_identity_tokens),
350 transport_profile_uri: UAString::from(profiles::TRANSPORT_PROFILE_URI_BINARY),
351 security_level: endpoint.security_level,
352 }
353 }
354
355 pub fn discovery_urls(&self) -> Option<Vec<UAString>> {
356 let config = trace_read_lock!(self.config);
357 if config.discovery_urls.is_empty() {
358 None
359 } else {
360 Some(config.discovery_urls.iter().map(UAString::from).collect())
361 }
362 }
363
364 pub fn application_type(&self) -> ApplicationType {
365 ApplicationType::Server
366 }
367
368 pub fn gateway_server_uri(&self) -> UAString {
369 UAString::null()
370 }
371
372 pub fn abort(&mut self) {
373 info!("Server has been told to abort");
374 self.abort = true;
375 self.state = ServerStateType::Shutdown;
376 }
377
378 pub fn state(&self) -> ServerStateType {
379 self.state
380 }
381
382 pub fn set_state(&mut self, state: ServerStateType) {
383 self.state = state;
384 }
385
386 pub fn is_abort(&self) -> bool {
387 self.abort
388 }
389
390 pub fn is_running(&self) -> bool {
391 self.state == ServerStateType::Running
392 }
393
394 pub fn server_certificate_as_byte_string(&self) -> ByteString {
395 if let Some(ref server_certificate) = self.server_certificate {
396 server_certificate.as_byte_string()
397 } else {
398 ByteString::null()
399 }
400 }
401
402 pub fn registered_server(&self) -> RegisteredServer {
403 let server_uri = self.application_uri.clone();
404 let product_uri = self.product_uri.clone();
405 let gateway_server_uri = self.gateway_server_uri();
406 let discovery_urls = self.discovery_urls();
407 let server_type = self.application_type();
408 let is_online = self.is_running();
409 let server_names = Some(vec![self.application_name.clone()]);
410 RegisteredServer {
412 server_uri,
413 product_uri,
414 server_names,
415 server_type,
416 gateway_server_uri,
417 discovery_urls,
418 semaphore_file_path: UAString::null(),
419 is_online,
420 }
421 }
422
423 pub fn create_subscription_id(&mut self) -> u32 {
424 self.last_subscription_id += 1;
425 self.last_subscription_id
426 }
427
428 pub fn authenticate_endpoint(
435 &self,
436 request: &ActivateSessionRequest,
437 endpoint_url: &str,
438 security_policy: SecurityPolicy,
439 security_mode: MessageSecurityMode,
440 user_identity_token: &ExtensionObject,
441 server_nonce: &ByteString,
442 ) -> Result<String, StatusCode> {
443 let config = trace_read_lock!(self.config);
445
446 if let Some(endpoint) = config.find_endpoint(endpoint_url, security_policy, security_mode) {
447 match IdentityToken::new(user_identity_token, &self.decoding_options()) {
449 IdentityToken::None => {
450 error!("User identity token type unsupported");
451 Err(StatusCode::BadIdentityTokenInvalid)
452 }
453 IdentityToken::AnonymousIdentityToken(token) => {
454 Self::authenticate_anonymous_token(endpoint, &token)
455 }
456 IdentityToken::UserNameIdentityToken(token) => self
457 .authenticate_username_identity_token(
458 &config,
459 endpoint,
460 &token,
461 &self.server_pkey,
462 server_nonce,
463 ),
464 IdentityToken::X509IdentityToken(token) => self.authenticate_x509_identity_token(
465 &config,
466 endpoint,
467 &token,
468 &request.user_token_signature,
469 &self.server_certificate,
470 server_nonce,
471 ),
472 IdentityToken::Invalid(o) => {
473 error!("User identity token type {:?} is unsupported", o.node_id);
474 Err(StatusCode::BadIdentityTokenInvalid)
475 }
476 }
477 } else {
478 error!("Cannot find endpoint that matches path \"{}\", security policy {:?}, and security mode {:?}", endpoint_url, security_policy, security_mode);
479 Err(StatusCode::BadTcpEndpointUrlInvalid)
480 }
481 }
482
483 pub fn set_register_nodes_callbacks(
484 &mut self,
485 register_nodes_callback: Box<dyn RegisterNodes + Send + Sync>,
486 unregister_nodes_callback: Box<dyn UnregisterNodes + Send + Sync>,
487 ) {
488 self.register_nodes_callback = Some(register_nodes_callback);
489 self.unregister_nodes_callback = Some(unregister_nodes_callback);
490 }
491
492 pub fn decoding_options(&self) -> DecodingOptions {
494 let config = trace_read_lock!(self.config);
495 config.decoding_options()
496 }
497
498 fn authenticate_anonymous_token(
500 endpoint: &ServerEndpoint,
501 token: &AnonymousIdentityToken,
502 ) -> Result<String, StatusCode> {
503 if token.policy_id.as_ref() != POLICY_ID_ANONYMOUS {
504 error!("Token doesn't possess the correct policy id");
505 Err(StatusCode::BadIdentityTokenInvalid)
506 } else if !endpoint.supports_anonymous() {
507 error!(
508 "Endpoint \"{}\" does not support anonymous authentication",
509 endpoint.path
510 );
511 Err(StatusCode::BadIdentityTokenRejected)
512 } else {
513 debug!("Anonymous identity is authenticated");
514 Ok(String::from(crate::server::config::ANONYMOUS_USER_TOKEN_ID))
515 }
516 }
517
518 fn authenticate_username_identity_token(
521 &self,
522 config: &ServerConfig,
523 endpoint: &ServerEndpoint,
524 token: &UserNameIdentityToken,
525 server_key: &Option<PrivateKey>,
526 server_nonce: &ByteString,
527 ) -> Result<String, StatusCode> {
528 if !endpoint.supports_user_pass(&config.user_tokens) {
529 error!("Endpoint doesn't support username password tokens");
530 Err(StatusCode::BadIdentityTokenRejected)
531 } else if token.policy_id != Self::user_pass_security_policy_id(endpoint) {
532 error!("Token doesn't possess the correct policy id");
533 Err(StatusCode::BadIdentityTokenInvalid)
534 } else if token.user_name.is_null() {
535 error!("User identify token supplies no user name");
536 Err(StatusCode::BadIdentityTokenInvalid)
537 } else {
538 debug!(
539 "policy id = {}, encryption algorithm = {}",
540 token.policy_id.as_ref(),
541 token.encryption_algorithm.as_ref()
542 );
543 let token_password = if !token.encryption_algorithm.is_null() {
544 if let Some(ref server_key) = server_key {
545 user_identity::decrypt_user_identity_token_password(
546 token,
547 server_nonce.as_ref(),
548 server_key,
549 )?
550 } else {
551 error!("Identity token password is encrypted but no server private key was supplied");
552 return Err(StatusCode::BadIdentityTokenInvalid);
553 }
554 } else {
555 token.plaintext_password()?
556 };
557
558 for user_token_id in &endpoint.user_token_ids {
560 if let Some(server_user_token) = config.user_tokens.get(user_token_id) {
561 if server_user_token.is_user_pass()
562 && server_user_token.user == token.user_name.as_ref()
563 {
564 let valid = if server_user_token.pass.is_none() {
566 token_password.is_empty()
568 } else {
569 let server_password =
571 server_user_token.pass.as_ref().unwrap().as_bytes();
572 server_password == token_password.as_bytes()
573 };
574 if !valid {
575 error!(
576 "Cannot authenticate \"{}\", password is invalid",
577 server_user_token.user
578 );
579 return Err(StatusCode::BadUserAccessDenied);
580 } else {
581 return Ok(user_token_id.clone());
582 }
583 }
584 }
585 }
586 error!(
587 "Cannot authenticate \"{}\", user not found for endpoint",
588 token.user_name
589 );
590 Err(StatusCode::BadUserAccessDenied)
591 }
592 }
593
594 fn authenticate_x509_identity_token(
597 &self,
598 config: &ServerConfig,
599 endpoint: &ServerEndpoint,
600 token: &X509IdentityToken,
601 user_token_signature: &SignatureData,
602 server_certificate: &Option<X509>,
603 server_nonce: &ByteString,
604 ) -> Result<String, StatusCode> {
605 if !endpoint.supports_x509(&config.user_tokens) {
606 error!("Endpoint doesn't support x509 tokens");
607 Err(StatusCode::BadIdentityTokenRejected)
608 } else if token.policy_id.as_ref() != POLICY_ID_X509 {
609 error!("Token doesn't possess the correct policy id");
610 Err(StatusCode::BadIdentityTokenRejected)
611 } else {
612 let result = match server_certificate {
613 Some(ref server_certificate) => {
614 let user_identity_tokens = self.user_identity_tokens(config, endpoint);
616 let security_policy = user_identity_tokens
617 .iter()
618 .find(|t| t.token_type == UserTokenType::Certificate)
619 .map(|t| SecurityPolicy::from_uri(t.security_policy_uri.as_ref()))
620 .unwrap_or_else(|| endpoint.security_policy());
621
622 match security_policy {
624 SecurityPolicy::Unknown | SecurityPolicy::None => {
625 Err(StatusCode::BadIdentityTokenInvalid)
626 }
627 security_policy => {
628 user_identity::verify_x509_identity_token(
630 token,
631 user_token_signature,
632 security_policy,
633 server_certificate,
634 server_nonce.as_ref(),
635 )
636 }
637 }
638 }
639 None => Err(StatusCode::BadIdentityTokenInvalid),
640 };
641 result.and_then(|_| {
642 let signing_cert = X509::from_byte_string(&token.certificate_data)?;
644 let signing_thumbprint = signing_cert.thumbprint();
645 for user_token_id in &endpoint.user_token_ids {
646 if let Some(server_user_token) = config.user_tokens.get(user_token_id) {
647 if let Some(ref user_thumbprint) = server_user_token.thumbprint {
648 if *user_thumbprint == signing_thumbprint {
650 return Ok(user_token_id.clone());
651 }
652 }
653 }
654 }
655 Err(StatusCode::BadIdentityTokenInvalid)
656 })
657 }
658 }
659
660 pub fn set_historical_data_provider(
661 &mut self,
662 historical_data_provider: Box<dyn HistoricalDataProvider + Send + Sync>,
663 ) {
664 self.historical_data_provider = Some(historical_data_provider);
665 }
666
667 pub fn set_historical_event_provider(
668 &mut self,
669 historical_event_provider: Box<dyn HistoricalEventProvider + Send + Sync>,
670 ) {
671 self.historical_event_provider = Some(historical_event_provider);
672 }
673
674 pub(crate) fn raise_and_log<T>(&self, event: T) -> Result<NodeId, ()>
675 where
676 T: AuditEvent + Event,
677 {
678 let audit_log = trace_write_lock!(self.audit_log);
679 audit_log.raise_and_log(event)
680 }
681}