1use std::sync::{Arc, RwLock};
8
9use opcua_core::prelude::*;
10use opcua_crypto::{user_identity, PrivateKey, SecurityPolicy, X509};
11use opcua_types::{
12 profiles,
13 service_types::{
14 ActivateSessionRequest, AnonymousIdentityToken, ApplicationDescription, ApplicationType,
15 EndpointDescription, RegisteredServer, ServerState as ServerStateType, SignatureData,
16 UserNameIdentityToken, UserTokenPolicy, UserTokenType, X509IdentityToken,
17 },
18 status_code::StatusCode,
19};
20
21use crate::{
22 callbacks::{RegisterNodes, UnregisterNodes},
23 config::{ServerConfig, ServerEndpoint},
24 constants,
25 diagnostics::ServerDiagnostics,
26 events::{
27 audit::{AuditEvent, AuditLog},
28 event::Event,
29 },
30 historical::{HistoricalDataProvider, HistoricalEventProvider},
31 identity_token::{
32 IdentityToken, POLICY_ID_ANONYMOUS, POLICY_ID_USER_PASS_NONE, POLICY_ID_USER_PASS_RSA_15,
33 POLICY_ID_USER_PASS_RSA_OAEP, POLICY_ID_X509,
34 },
35};
36
37pub(crate) struct OperationalLimits {
38 pub max_nodes_per_translate_browse_paths_to_node_ids: usize,
39 pub max_nodes_per_read: usize,
40 pub max_nodes_per_write: usize,
41 pub max_nodes_per_method_call: usize,
42 pub max_nodes_per_browse: usize,
43 pub max_nodes_per_register_nodes: usize,
44 pub max_nodes_per_node_management: usize,
45 pub max_monitored_items_per_call: usize,
46 pub max_nodes_per_history_read_data: usize,
47 pub max_nodes_per_history_read_events: usize,
48 pub max_nodes_per_history_update_data: usize,
49 pub max_nodes_per_history_update_events: usize,
50}
51
52impl Default for OperationalLimits {
53 fn default() -> Self {
54 Self {
55 max_nodes_per_translate_browse_paths_to_node_ids:
56 constants::MAX_NODES_PER_TRANSLATE_BROWSE_PATHS_TO_NODE_IDS,
57 max_nodes_per_read: constants::MAX_NODES_PER_READ,
58 max_nodes_per_write: constants::MAX_NODES_PER_WRITE,
59 max_nodes_per_method_call: constants::MAX_NODES_PER_METHOD_CALL,
60 max_nodes_per_browse: constants::MAX_NODES_PER_BROWSE,
61 max_nodes_per_register_nodes: constants::MAX_NODES_PER_REGISTER_NODES,
62 max_nodes_per_node_management: constants::MAX_NODES_PER_NODE_MANAGEMENT,
63 max_monitored_items_per_call: constants::MAX_MONITORED_ITEMS_PER_CALL,
64 max_nodes_per_history_read_data: constants::MAX_NODES_PER_HISTORY_READ_DATA,
65 max_nodes_per_history_read_events: constants::MAX_NODES_PER_HISTORY_READ_EVENTS,
66 max_nodes_per_history_update_data: constants::MAX_NODES_PER_HISTORY_UPDATE_DATA,
67 max_nodes_per_history_update_events: constants::MAX_NODES_PER_HISTORY_UPDATE_EVENTS,
68 }
69 }
70}
71
72pub struct ServerState {
75 pub application_uri: UAString,
77 pub product_uri: UAString,
79 pub application_name: LocalizedText,
81 pub base_endpoint: String,
83 pub start_time: DateTime,
85 pub servers: Vec<String>,
87 pub config: Arc<RwLock<ServerConfig>>,
89 pub server_certificate: Option<X509>,
91 pub server_pkey: Option<PrivateKey>,
93 pub last_subscription_id: u32,
96 pub max_subscriptions: usize,
98 pub max_monitored_items_per_sub: usize,
100 pub max_monitored_item_queue_size: usize,
102 pub min_publishing_interval_ms: Duration,
104 pub min_sampling_interval_ms: Duration,
106 pub default_keep_alive_count: u32,
108 pub max_keep_alive_count: u32,
110 pub max_lifetime_count: u32,
112 pub(crate) operational_limits: OperationalLimits,
114 pub state: ServerStateType,
116 pub abort: bool,
118 pub(crate) audit_log: Arc<RwLock<AuditLog>>,
120 pub(crate) diagnostics: Arc<RwLock<ServerDiagnostics>>,
122 pub(crate) register_nodes_callback: Option<Box<dyn RegisterNodes + Send + Sync>>,
124 pub(crate) unregister_nodes_callback: Option<Box<dyn UnregisterNodes + Send + Sync>>,
126 pub(crate) historical_data_provider: Option<Box<dyn HistoricalDataProvider + Send + Sync>>,
128 pub(crate) historical_event_provider: Option<Box<dyn HistoricalEventProvider + Send + Sync>>,
130}
131
132impl ServerState {
133 pub fn endpoints(
134 &self,
135 endpoint_url: &UAString,
136 transport_profile_uris: &Option<Vec<UAString>>,
137 ) -> Option<Vec<EndpointDescription>> {
138 debug!(
140 "Endpoints requested, transport profile uris {:?}",
141 transport_profile_uris
142 );
143 if let Some(ref transport_profile_uris) = *transport_profile_uris {
144 if !transport_profile_uris.is_empty() {
146 let found_binary_transport = transport_profile_uris.iter().any(|profile_uri| {
148 profile_uri.as_ref() == profiles::TRANSPORT_PROFILE_URI_BINARY
149 });
150 if !found_binary_transport {
151 error!(
152 "Client wants to connect with a non binary transport {:#?}",
153 transport_profile_uris
154 );
155 return None;
156 }
157 }
158 }
159
160 let config = trace_read_lock!(self.config);
161 if let Ok(hostname) = hostname_from_url(endpoint_url.as_ref()) {
162 if !hostname.eq_ignore_ascii_case(&config.tcp_config.host) {
163 debug!("Endpoint url \"{}\" hostname supplied by caller does not match server's hostname \"{}\"", endpoint_url, &config.tcp_config.host);
164 }
165 let endpoints = config
166 .endpoints
167 .iter()
168 .map(|(_, e)| self.new_endpoint_description(&config, e, true))
169 .collect();
170 Some(endpoints)
171 } else {
172 warn!(
173 "Endpoint url \"{}\" is unrecognized, using default",
174 endpoint_url
175 );
176 if let Some(e) = config.default_endpoint() {
177 Some(vec![self.new_endpoint_description(&config, e, true)])
178 } else {
179 Some(vec![])
180 }
181 }
182 }
183
184 pub fn endpoint_exists(
185 &self,
186 endpoint_url: &str,
187 security_policy: SecurityPolicy,
188 security_mode: MessageSecurityMode,
189 ) -> bool {
190 let config = trace_read_lock!(self.config);
191 config
192 .find_endpoint(endpoint_url, security_policy, security_mode)
193 .is_some()
194 }
195
196 pub fn new_endpoint_descriptions(
200 &self,
201 endpoint_url: &str,
202 ) -> Option<Vec<EndpointDescription>> {
203 debug!("find_endpoint, url = {}", endpoint_url);
204 let config = trace_read_lock!(self.config);
205 let base_endpoint_url = config.base_endpoint_url();
206 let endpoints: Vec<EndpointDescription> = config
207 .endpoints
208 .iter()
209 .filter(|&(_, e)| {
210 url_matches_except_host(&e.endpoint_url(&base_endpoint_url), endpoint_url)
212 })
213 .map(|(_, e)| self.new_endpoint_description(&config, e, false))
214 .collect();
215 if endpoints.is_empty() {
216 None
217 } else {
218 Some(endpoints)
219 }
220 }
221
222 fn user_pass_security_policy_id(endpoint: &ServerEndpoint) -> UAString {
224 match endpoint.password_security_policy() {
225 SecurityPolicy::None => POLICY_ID_USER_PASS_NONE,
226 SecurityPolicy::Basic128Rsa15 => POLICY_ID_USER_PASS_RSA_15,
227 SecurityPolicy::Basic256 | SecurityPolicy::Basic256Sha256 => {
228 POLICY_ID_USER_PASS_RSA_OAEP
229 }
230 SecurityPolicy::Aes128Sha256RsaOaep | SecurityPolicy::Aes256Sha256RsaPss => {
232 POLICY_ID_USER_PASS_RSA_OAEP
233 }
234 _ => {
235 panic!()
236 }
237 }
238 .into()
239 }
240
241 fn user_pass_security_policy_uri(_endpoint: &ServerEndpoint) -> UAString {
242 UAString::null()
245 }
246
247 fn user_identity_tokens(
248 &self,
249 config: &ServerConfig,
250 endpoint: &ServerEndpoint,
251 ) -> Vec<UserTokenPolicy> {
252 let mut user_identity_tokens = Vec::with_capacity(3);
253
254 if endpoint.supports_anonymous() {
256 user_identity_tokens.push(UserTokenPolicy {
257 policy_id: UAString::from(POLICY_ID_ANONYMOUS),
258 token_type: UserTokenType::Anonymous,
259 issued_token_type: UAString::null(),
260 issuer_endpoint_url: UAString::null(),
261 security_policy_uri: UAString::null(),
262 });
263 }
264 if endpoint.supports_user_pass(&config.user_tokens) {
266 user_identity_tokens.push(UserTokenPolicy {
268 policy_id: Self::user_pass_security_policy_id(endpoint),
269 token_type: UserTokenType::UserName,
270 issued_token_type: UAString::null(),
271 issuer_endpoint_url: UAString::null(),
272 security_policy_uri: Self::user_pass_security_policy_uri(endpoint),
273 });
274 }
275 if endpoint.supports_x509(&config.user_tokens) {
277 user_identity_tokens.push(UserTokenPolicy {
278 policy_id: UAString::from(POLICY_ID_X509),
279 token_type: UserTokenType::Certificate,
280 issued_token_type: UAString::null(),
281 issuer_endpoint_url: UAString::null(),
282 security_policy_uri: UAString::from(SecurityPolicy::Basic128Rsa15.to_uri()),
283 });
284 }
285
286 if user_identity_tokens.is_empty() {
287 debug!(
288 "user_identity_tokens() returned zero endpoints for endpoint {} / {} {}",
289 endpoint.path, endpoint.security_policy, endpoint.security_mode
290 );
291 }
292
293 user_identity_tokens
294 }
295
296 fn new_endpoint_description(
298 &self,
299 config: &ServerConfig,
300 endpoint: &ServerEndpoint,
301 all_fields: bool,
302 ) -> EndpointDescription {
303 let base_endpoint_url = config.base_endpoint_url();
304
305 let user_identity_tokens = self.user_identity_tokens(config, endpoint);
306
307 let (server, server_certificate) = if all_fields {
311 (
312 ApplicationDescription {
313 application_uri: self.application_uri.clone(),
314 product_uri: self.product_uri.clone(),
315 application_name: self.application_name.clone(),
316 application_type: self.application_type(),
317 gateway_server_uri: self.gateway_server_uri(),
318 discovery_profile_uri: UAString::null(),
319 discovery_urls: self.discovery_urls(),
320 },
321 self.server_certificate_as_byte_string(),
322 )
323 } else {
324 (
325 ApplicationDescription {
326 application_uri: UAString::null(),
327 product_uri: UAString::null(),
328 application_name: LocalizedText::null(),
329 application_type: self.application_type(),
330 gateway_server_uri: self.gateway_server_uri(),
331 discovery_profile_uri: UAString::null(),
332 discovery_urls: self.discovery_urls(),
333 },
334 ByteString::null(),
335 )
336 };
337
338 EndpointDescription {
339 endpoint_url: endpoint.endpoint_url(&base_endpoint_url).into(),
340 server,
341 server_certificate,
342 security_mode: endpoint.message_security_mode(),
343 security_policy_uri: UAString::from(endpoint.security_policy().to_uri()),
344 user_identity_tokens: Some(user_identity_tokens),
345 transport_profile_uri: UAString::from(profiles::TRANSPORT_PROFILE_URI_BINARY),
346 security_level: endpoint.security_level,
347 }
348 }
349
350 pub fn discovery_urls(&self) -> Option<Vec<UAString>> {
351 let config = trace_read_lock!(self.config);
352 if config.discovery_urls.is_empty() {
353 None
354 } else {
355 Some(config.discovery_urls.iter().map(UAString::from).collect())
356 }
357 }
358
359 pub fn application_type(&self) -> ApplicationType {
360 ApplicationType::Server
361 }
362
363 pub fn gateway_server_uri(&self) -> UAString {
364 UAString::null()
365 }
366
367 pub fn abort(&mut self) {
368 info!("Server has been told to abort");
369 self.abort = true;
370 self.state = ServerStateType::Shutdown;
371 }
372
373 pub fn state(&self) -> ServerStateType {
374 self.state
375 }
376
377 pub fn set_state(&mut self, state: ServerStateType) {
378 self.state = state;
379 }
380
381 pub fn is_abort(&self) -> bool {
382 self.abort
383 }
384
385 pub fn is_running(&self) -> bool {
386 self.state == ServerStateType::Running
387 }
388
389 pub fn server_certificate_as_byte_string(&self) -> ByteString {
390 if let Some(ref server_certificate) = self.server_certificate {
391 server_certificate.as_byte_string()
392 } else {
393 ByteString::null()
394 }
395 }
396
397 pub fn registered_server(&self) -> RegisteredServer {
398 let server_uri = self.application_uri.clone();
399 let product_uri = self.product_uri.clone();
400 let gateway_server_uri = self.gateway_server_uri();
401 let discovery_urls = self.discovery_urls();
402 let server_type = self.application_type();
403 let is_online = self.is_running();
404 let server_names = Some(vec![self.application_name.clone()]);
405 RegisteredServer {
407 server_uri,
408 product_uri,
409 server_names,
410 server_type,
411 gateway_server_uri,
412 discovery_urls,
413 semaphore_file_path: UAString::null(),
414 is_online,
415 }
416 }
417
418 pub fn create_subscription_id(&mut self) -> u32 {
419 self.last_subscription_id += 1;
420 self.last_subscription_id
421 }
422
423 pub fn authenticate_endpoint(
430 &self,
431 request: &ActivateSessionRequest,
432 endpoint_url: &str,
433 security_policy: SecurityPolicy,
434 security_mode: MessageSecurityMode,
435 user_identity_token: &ExtensionObject,
436 server_nonce: &ByteString,
437 ) -> Result<String, StatusCode> {
438 let config = trace_read_lock!(self.config);
440
441 if let Some(endpoint) = config.find_endpoint(endpoint_url, security_policy, security_mode) {
442 match IdentityToken::new(user_identity_token, &self.decoding_options()) {
444 IdentityToken::None => {
445 error!("User identity token type unsupported");
446 Err(StatusCode::BadIdentityTokenInvalid)
447 }
448 IdentityToken::AnonymousIdentityToken(token) => {
449 Self::authenticate_anonymous_token(endpoint, &token)
450 }
451 IdentityToken::UserNameIdentityToken(token) => self
452 .authenticate_username_identity_token(
453 &config,
454 endpoint,
455 &token,
456 &self.server_pkey,
457 server_nonce,
458 ),
459 IdentityToken::X509IdentityToken(token) => self.authenticate_x509_identity_token(
460 &config,
461 endpoint,
462 &token,
463 &request.user_token_signature,
464 &self.server_certificate,
465 server_nonce,
466 ),
467 IdentityToken::Invalid(o) => {
468 error!("User identity token type {:?} is unsupported", o.node_id);
469 Err(StatusCode::BadIdentityTokenInvalid)
470 }
471 }
472 } else {
473 error!("Cannot find endpoint that matches path \"{}\", security policy {:?}, and security mode {:?}", endpoint_url, security_policy, security_mode);
474 Err(StatusCode::BadTcpEndpointUrlInvalid)
475 }
476 }
477
478 pub fn set_register_nodes_callbacks(
479 &mut self,
480 register_nodes_callback: Box<dyn RegisterNodes + Send + Sync>,
481 unregister_nodes_callback: Box<dyn UnregisterNodes + Send + Sync>,
482 ) {
483 self.register_nodes_callback = Some(register_nodes_callback);
484 self.unregister_nodes_callback = Some(unregister_nodes_callback);
485 }
486
487 pub fn decoding_options(&self) -> DecodingOptions {
489 let config = trace_read_lock!(self.config);
490 config.decoding_options()
491 }
492
493 fn authenticate_anonymous_token(
495 endpoint: &ServerEndpoint,
496 token: &AnonymousIdentityToken,
497 ) -> Result<String, StatusCode> {
498 if token.policy_id.as_ref() != POLICY_ID_ANONYMOUS {
499 error!("Token doesn't possess the correct policy id");
500 Err(StatusCode::BadIdentityTokenInvalid)
501 } else if !endpoint.supports_anonymous() {
502 error!(
503 "Endpoint \"{}\" does not support anonymous authentication",
504 endpoint.path
505 );
506 Err(StatusCode::BadIdentityTokenRejected)
507 } else {
508 debug!("Anonymous identity is authenticated");
509 Ok(String::from(crate::config::ANONYMOUS_USER_TOKEN_ID))
510 }
511 }
512
513 fn authenticate_username_identity_token(
516 &self,
517 config: &ServerConfig,
518 endpoint: &ServerEndpoint,
519 token: &UserNameIdentityToken,
520 server_key: &Option<PrivateKey>,
521 server_nonce: &ByteString,
522 ) -> Result<String, StatusCode> {
523 if !endpoint.supports_user_pass(&config.user_tokens) {
524 error!("Endpoint doesn't support username password tokens");
525 Err(StatusCode::BadIdentityTokenRejected)
526 } else if token.policy_id != Self::user_pass_security_policy_id(endpoint) {
527 error!("Token doesn't possess the correct policy id");
528 Err(StatusCode::BadIdentityTokenInvalid)
529 } else if token.user_name.is_null() {
530 error!("User identify token supplies no user name");
531 Err(StatusCode::BadIdentityTokenInvalid)
532 } else {
533 debug!(
534 "policy id = {}, encryption algorithm = {}",
535 token.policy_id.as_ref(),
536 token.encryption_algorithm.as_ref()
537 );
538 let token_password = if !token.encryption_algorithm.is_null() {
539 if let Some(ref server_key) = server_key {
540 user_identity::decrypt_user_identity_token_password(
541 token,
542 server_nonce.as_ref(),
543 server_key,
544 )?
545 } else {
546 error!("Identity token password is encrypted but no server private key was supplied");
547 return Err(StatusCode::BadIdentityTokenInvalid);
548 }
549 } else {
550 token.plaintext_password()?
551 };
552
553 for user_token_id in &endpoint.user_token_ids {
555 if let Some(server_user_token) = config.user_tokens.get(user_token_id) {
556 if server_user_token.is_user_pass()
557 && server_user_token.user == token.user_name.as_ref()
558 {
559 let valid = if server_user_token.pass.is_none() {
561 token_password.is_empty()
563 } else {
564 let server_password =
566 server_user_token.pass.as_ref().unwrap().as_bytes();
567 server_password == token_password.as_bytes()
568 };
569 if !valid {
570 error!(
571 "Cannot authenticate \"{}\", password is invalid",
572 server_user_token.user
573 );
574 return Err(StatusCode::BadUserAccessDenied);
575 } else {
576 return Ok(user_token_id.clone());
577 }
578 }
579 }
580 }
581 error!(
582 "Cannot authenticate \"{}\", user not found for endpoint",
583 token.user_name
584 );
585 Err(StatusCode::BadUserAccessDenied)
586 }
587 }
588
589 fn authenticate_x509_identity_token(
592 &self,
593 config: &ServerConfig,
594 endpoint: &ServerEndpoint,
595 token: &X509IdentityToken,
596 user_token_signature: &SignatureData,
597 server_certificate: &Option<X509>,
598 server_nonce: &ByteString,
599 ) -> Result<String, StatusCode> {
600 if !endpoint.supports_x509(&config.user_tokens) {
601 error!("Endpoint doesn't support x509 tokens");
602 Err(StatusCode::BadIdentityTokenRejected)
603 } else if token.policy_id.as_ref() != POLICY_ID_X509 {
604 error!("Token doesn't possess the correct policy id");
605 Err(StatusCode::BadIdentityTokenRejected)
606 } else {
607 let result = match server_certificate {
608 Some(ref server_certificate) => {
609 let user_identity_tokens = self.user_identity_tokens(config, endpoint);
611 let security_policy = user_identity_tokens
612 .iter()
613 .find(|t| t.token_type == UserTokenType::Certificate)
614 .map(|t| SecurityPolicy::from_uri(t.security_policy_uri.as_ref()))
615 .unwrap_or_else(|| endpoint.security_policy());
616
617 match security_policy {
619 SecurityPolicy::Unknown | SecurityPolicy::None => {
620 Err(StatusCode::BadIdentityTokenInvalid)
621 }
622 security_policy => {
623 user_identity::verify_x509_identity_token(
625 token,
626 user_token_signature,
627 security_policy,
628 server_certificate,
629 server_nonce.as_ref(),
630 )
631 }
632 }
633 }
634 None => Err(StatusCode::BadIdentityTokenInvalid),
635 };
636 result.and_then(|_| {
637 let signing_cert = X509::from_byte_string(&token.certificate_data)?;
639 let signing_thumbprint = signing_cert.thumbprint();
640 for user_token_id in &endpoint.user_token_ids {
641 if let Some(server_user_token) = config.user_tokens.get(user_token_id) {
642 if let Some(ref user_thumbprint) = server_user_token.thumbprint {
643 if *user_thumbprint == signing_thumbprint {
645 return Ok(user_token_id.clone());
646 }
647 }
648 }
649 }
650 Err(StatusCode::BadIdentityTokenInvalid)
651 })
652 }
653 }
654
655 pub fn set_historical_data_provider(
656 &mut self,
657 historical_data_provider: Box<dyn HistoricalDataProvider + Send + Sync>,
658 ) {
659 self.historical_data_provider = Some(historical_data_provider);
660 }
661
662 pub fn set_historical_event_provider(
663 &mut self,
664 historical_event_provider: Box<dyn HistoricalEventProvider + Send + Sync>,
665 ) {
666 self.historical_event_provider = Some(historical_event_provider);
667 }
668
669 pub(crate) fn raise_and_log<T>(&self, event: T) -> Result<NodeId, ()>
670 where
671 T: AuditEvent + Event,
672 {
673 let audit_log = trace_write_lock!(self.audit_log);
674 audit_log.raise_and_log(event)
675 }
676}