Skip to main content

opcua/server/
state.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4
5//! Provides server state information, such as status, configuration, running servers and so on.
6
7use std::sync::Arc;
8
9use crate::core::prelude::*;
10use crate::crypto::{user_identity, PrivateKey, SecurityPolicy, X509};
11use crate::sync::*;
12use crate::types::{
13    profiles,
14    service_types::{
15        ActivateSessionRequest, AnonymousIdentityToken, ApplicationDescription, ApplicationType,
16        EndpointDescription, RegisteredServer, ServerState as ServerStateType, SignatureData,
17        UserNameIdentityToken, UserTokenPolicy, UserTokenType, X509IdentityToken,
18    },
19    status_code::StatusCode,
20};
21
22use crate::server::{
23    callbacks::{RegisterNodes, UnregisterNodes},
24    config::{ServerConfig, ServerEndpoint},
25    constants,
26    diagnostics::ServerDiagnostics,
27    events::{
28        audit::{AuditEvent, AuditLog},
29        event::Event,
30    },
31    historical::{HistoricalDataProvider, HistoricalEventProvider},
32    identity_token::{
33        IdentityToken, POLICY_ID_ANONYMOUS, POLICY_ID_USER_PASS_NONE, POLICY_ID_USER_PASS_RSA_15,
34        POLICY_ID_USER_PASS_RSA_OAEP, POLICY_ID_X509,
35    },
36};
37
38pub(crate) struct OperationalLimits {
39    pub max_nodes_per_translate_browse_paths_to_node_ids: usize,
40    pub max_nodes_per_read: usize,
41    pub max_nodes_per_write: usize,
42    pub max_nodes_per_method_call: usize,
43    pub max_nodes_per_browse: usize,
44    pub max_nodes_per_register_nodes: usize,
45    pub max_nodes_per_node_management: usize,
46    pub max_monitored_items_per_call: usize,
47    pub max_nodes_per_history_read_data: usize,
48    pub max_nodes_per_history_read_events: usize,
49    pub max_nodes_per_history_update_data: usize,
50    pub max_nodes_per_history_update_events: usize,
51}
52
53impl Default for OperationalLimits {
54    fn default() -> Self {
55        Self {
56            max_nodes_per_translate_browse_paths_to_node_ids:
57                constants::MAX_NODES_PER_TRANSLATE_BROWSE_PATHS_TO_NODE_IDS,
58            max_nodes_per_read: constants::MAX_NODES_PER_READ,
59            max_nodes_per_write: constants::MAX_NODES_PER_WRITE,
60            max_nodes_per_method_call: constants::MAX_NODES_PER_METHOD_CALL,
61            max_nodes_per_browse: constants::MAX_NODES_PER_BROWSE,
62            max_nodes_per_register_nodes: constants::MAX_NODES_PER_REGISTER_NODES,
63            max_nodes_per_node_management: constants::MAX_NODES_PER_NODE_MANAGEMENT,
64            max_monitored_items_per_call: constants::MAX_MONITORED_ITEMS_PER_CALL,
65            max_nodes_per_history_read_data: constants::MAX_NODES_PER_HISTORY_READ_DATA,
66            max_nodes_per_history_read_events: constants::MAX_NODES_PER_HISTORY_READ_EVENTS,
67            max_nodes_per_history_update_data: constants::MAX_NODES_PER_HISTORY_UPDATE_DATA,
68            max_nodes_per_history_update_events: constants::MAX_NODES_PER_HISTORY_UPDATE_EVENTS,
69        }
70    }
71}
72
73/// Server state is any state associated with the server as a whole that individual sessions might
74/// be interested in. That includes configuration info etc.
75pub struct ServerState {
76    /// The application URI
77    pub application_uri: UAString,
78    /// The product URI
79    pub product_uri: UAString,
80    /// The application name
81    pub application_name: LocalizedText,
82    /// The protocol, hostname and port formatted as a url, but less the path
83    pub base_endpoint: String,
84    /// The time the server started
85    pub start_time: DateTime,
86    /// The list of servers (by urn)
87    pub servers: Vec<String>,
88    /// Server configuration
89    pub config: Arc<RwLock<ServerConfig>>,
90    /// Server public certificate read from config location or null if there is none
91    pub server_certificate: Option<X509>,
92    /// Server private key
93    pub server_pkey: Option<PrivateKey>,
94    /// The next subscription id - subscriptions are shared across the whole server. Initial value
95    /// is a random u32.
96    pub last_subscription_id: u32,
97    /// Maximum number of subscriptions per session, 0 means no limit (danger)
98    pub max_subscriptions: usize,
99    /// Maximum number of monitored items per subscription, 0 means no limit (danger)
100    pub max_monitored_items_per_sub: usize,
101    /// Maximum number of queued values in a monitored item, 0 means no limit (danger)
102    pub max_monitored_item_queue_size: usize,
103    /// Minimum publishing interval (in millis)
104    pub min_publishing_interval_ms: Duration,
105    /// Minimum sampling interval (in millis)
106    pub min_sampling_interval_ms: Duration,
107    /// Default keep alive count
108    pub default_keep_alive_count: u32,
109    /// Maxmimum keep alive count
110    pub max_keep_alive_count: u32,
111    /// Maximum lifetime count (3 times as large as max keep alive)
112    pub max_lifetime_count: u32,
113    /// Operational limits
114    pub(crate) operational_limits: OperationalLimits,
115    /// Current state
116    pub state: ServerStateType,
117    /// Sets the abort flag that terminates the associated server
118    pub abort: bool,
119    /// Audit log
120    pub(crate) audit_log: Arc<RwLock<AuditLog>>,
121    /// Diagnostic information
122    pub(crate) diagnostics: Arc<RwLock<ServerDiagnostics>>,
123    /// Callback for register nodes
124    pub(crate) register_nodes_callback: Option<Box<dyn RegisterNodes + Send + Sync>>,
125    /// Callback for unregister nodes
126    pub(crate) unregister_nodes_callback: Option<Box<dyn UnregisterNodes + Send + Sync>>,
127    /// Callback for historical data
128    pub(crate) historical_data_provider: Option<Box<dyn HistoricalDataProvider + Send + Sync>>,
129    /// Callback for historical events
130    pub(crate) historical_event_provider: Option<Box<dyn HistoricalEventProvider + Send + Sync>>,
131    /// Size of the send buffer in bytes
132    pub send_buffer_size: usize,
133    /// Size of the receive buffer in bytes
134    pub receive_buffer_size: usize,
135}
136
137impl ServerState {
138    pub fn endpoints(
139        &self,
140        endpoint_url: &UAString,
141        transport_profile_uris: &Option<Vec<UAString>>,
142    ) -> Option<Vec<EndpointDescription>> {
143        // Filter endpoints based on profile_uris
144        debug!(
145            "Endpoints requested, transport profile uris {:?}",
146            transport_profile_uris
147        );
148        if let Some(ref transport_profile_uris) = *transport_profile_uris {
149            // Note - some clients pass an empty array
150            if !transport_profile_uris.is_empty() {
151                // As we only support binary transport, the result is None if the supplied profile_uris does not contain that profile
152                let found_binary_transport = transport_profile_uris.iter().any(|profile_uri| {
153                    profile_uri.as_ref() == profiles::TRANSPORT_PROFILE_URI_BINARY
154                });
155                if !found_binary_transport {
156                    error!(
157                        "Client wants to connect with a non binary transport {:#?}",
158                        transport_profile_uris
159                    );
160                    return None;
161                }
162            }
163        }
164
165        let config = trace_read_lock!(self.config);
166        if let Ok(hostname) = hostname_from_url(endpoint_url.as_ref()) {
167            if !hostname.eq_ignore_ascii_case(&config.tcp_config.host) {
168                debug!("Endpoint url \"{}\" hostname supplied by caller does not match server's hostname \"{}\"", endpoint_url, &config.tcp_config.host);
169            }
170            let endpoints = config
171                .endpoints
172                .iter()
173                .map(|(_, e)| self.new_endpoint_description(&config, e, true))
174                .collect();
175            Some(endpoints)
176        } else {
177            warn!(
178                "Endpoint url \"{}\" is unrecognized, using default",
179                endpoint_url
180            );
181            if let Some(e) = config.default_endpoint() {
182                Some(vec![self.new_endpoint_description(&config, e, true)])
183            } else {
184                Some(vec![])
185            }
186        }
187    }
188
189    pub fn endpoint_exists(
190        &self,
191        endpoint_url: &str,
192        security_policy: SecurityPolicy,
193        security_mode: MessageSecurityMode,
194    ) -> bool {
195        let config = trace_read_lock!(self.config);
196        config
197            .find_endpoint(endpoint_url, security_policy, security_mode)
198            .is_some()
199    }
200
201    /// Make matching endpoint descriptions for the specified url.
202    /// If none match then None will be passed, therefore if Some is returned it will be guaranteed
203    /// to contain at least one result.
204    pub fn new_endpoint_descriptions(
205        &self,
206        endpoint_url: &str,
207    ) -> Option<Vec<EndpointDescription>> {
208        debug!("find_endpoint, url = {}", endpoint_url);
209        let config = trace_read_lock!(self.config);
210        let base_endpoint_url = config.base_endpoint_url();
211        let endpoints: Vec<EndpointDescription> = config
212            .endpoints
213            .iter()
214            .filter(|&(_, e)| {
215                // Test end point's security_policy_uri and matching url
216                url_matches_except_host(&e.endpoint_url(&base_endpoint_url), endpoint_url)
217            })
218            .map(|(_, e)| self.new_endpoint_description(&config, e, false))
219            .collect();
220        if endpoints.is_empty() {
221            None
222        } else {
223            Some(endpoints)
224        }
225    }
226
227    /// Determine what user/pass encryption to use depending on the security policy.
228    fn user_pass_security_policy_id(endpoint: &ServerEndpoint) -> UAString {
229        match endpoint.password_security_policy() {
230            SecurityPolicy::None => POLICY_ID_USER_PASS_NONE,
231            SecurityPolicy::Basic128Rsa15 => POLICY_ID_USER_PASS_RSA_15,
232            SecurityPolicy::Basic256 | SecurityPolicy::Basic256Sha256 => {
233                POLICY_ID_USER_PASS_RSA_OAEP
234            }
235            // TODO this is a placeholder
236            SecurityPolicy::Aes128Sha256RsaOaep | SecurityPolicy::Aes256Sha256RsaPss => {
237                POLICY_ID_USER_PASS_RSA_OAEP
238            }
239            _ => {
240                panic!()
241            }
242        }
243        .into()
244    }
245
246    fn user_pass_security_policy_uri(_endpoint: &ServerEndpoint) -> UAString {
247        // TODO we could force the security policy uri for passwords to be something other than the default
248        //  here to ensure they're secure even when the endpoint's security policy is None.
249        UAString::null()
250    }
251
252    fn user_identity_tokens(
253        &self,
254        config: &ServerConfig,
255        endpoint: &ServerEndpoint,
256    ) -> Vec<UserTokenPolicy> {
257        let mut user_identity_tokens = Vec::with_capacity(3);
258
259        // Anonymous policy
260        if endpoint.supports_anonymous() {
261            user_identity_tokens.push(UserTokenPolicy {
262                policy_id: UAString::from(POLICY_ID_ANONYMOUS),
263                token_type: UserTokenType::Anonymous,
264                issued_token_type: UAString::null(),
265                issuer_endpoint_url: UAString::null(),
266                security_policy_uri: UAString::null(),
267            });
268        }
269        // User pass policy
270        if endpoint.supports_user_pass(&config.user_tokens) {
271            // The endpoint may set a password security policy
272            user_identity_tokens.push(UserTokenPolicy {
273                policy_id: Self::user_pass_security_policy_id(endpoint),
274                token_type: UserTokenType::UserName,
275                issued_token_type: UAString::null(),
276                issuer_endpoint_url: UAString::null(),
277                security_policy_uri: Self::user_pass_security_policy_uri(endpoint),
278            });
279        }
280        // X509 policy
281        if endpoint.supports_x509(&config.user_tokens) {
282            user_identity_tokens.push(UserTokenPolicy {
283                policy_id: UAString::from(POLICY_ID_X509),
284                token_type: UserTokenType::Certificate,
285                issued_token_type: UAString::null(),
286                issuer_endpoint_url: UAString::null(),
287                security_policy_uri: UAString::from(SecurityPolicy::Basic128Rsa15.to_uri()),
288            });
289        }
290
291        if user_identity_tokens.is_empty() {
292            debug!(
293                "user_identity_tokens() returned zero endpoints for endpoint {} / {} {}",
294                endpoint.path, endpoint.security_policy, endpoint.security_mode
295            );
296        }
297
298        user_identity_tokens
299    }
300
301    /// Constructs a new endpoint description using the server's info and that in an Endpoint
302    fn new_endpoint_description(
303        &self,
304        config: &ServerConfig,
305        endpoint: &ServerEndpoint,
306        all_fields: bool,
307    ) -> EndpointDescription {
308        let base_endpoint_url = config.base_endpoint_url();
309
310        let user_identity_tokens = self.user_identity_tokens(config, endpoint);
311
312        // CreateSession doesn't need all the endpoint description
313        // and docs say not to bother sending the server and server
314        // certificate info.
315        let (server, server_certificate) = if all_fields {
316            (
317                ApplicationDescription {
318                    application_uri: self.application_uri.clone(),
319                    product_uri: self.product_uri.clone(),
320                    application_name: self.application_name.clone(),
321                    application_type: self.application_type(),
322                    gateway_server_uri: self.gateway_server_uri(),
323                    discovery_profile_uri: UAString::null(),
324                    discovery_urls: self.discovery_urls(),
325                },
326                self.server_certificate_as_byte_string(),
327            )
328        } else {
329            (
330                ApplicationDescription {
331                    application_uri: UAString::null(),
332                    product_uri: UAString::null(),
333                    application_name: LocalizedText::null(),
334                    application_type: self.application_type(),
335                    gateway_server_uri: self.gateway_server_uri(),
336                    discovery_profile_uri: UAString::null(),
337                    discovery_urls: self.discovery_urls(),
338                },
339                ByteString::null(),
340            )
341        };
342
343        EndpointDescription {
344            endpoint_url: endpoint.endpoint_url(&base_endpoint_url).into(),
345            server,
346            server_certificate,
347            security_mode: endpoint.message_security_mode(),
348            security_policy_uri: UAString::from(endpoint.security_policy().to_uri()),
349            user_identity_tokens: Some(user_identity_tokens),
350            transport_profile_uri: UAString::from(profiles::TRANSPORT_PROFILE_URI_BINARY),
351            security_level: endpoint.security_level,
352        }
353    }
354
355    pub fn discovery_urls(&self) -> Option<Vec<UAString>> {
356        let config = trace_read_lock!(self.config);
357        if config.discovery_urls.is_empty() {
358            None
359        } else {
360            Some(config.discovery_urls.iter().map(UAString::from).collect())
361        }
362    }
363
364    pub fn application_type(&self) -> ApplicationType {
365        ApplicationType::Server
366    }
367
368    pub fn gateway_server_uri(&self) -> UAString {
369        UAString::null()
370    }
371
372    pub fn abort(&mut self) {
373        info!("Server has been told to abort");
374        self.abort = true;
375        self.state = ServerStateType::Shutdown;
376    }
377
378    pub fn state(&self) -> ServerStateType {
379        self.state
380    }
381
382    pub fn set_state(&mut self, state: ServerStateType) {
383        self.state = state;
384    }
385
386    pub fn is_abort(&self) -> bool {
387        self.abort
388    }
389
390    pub fn is_running(&self) -> bool {
391        self.state == ServerStateType::Running
392    }
393
394    pub fn server_certificate_as_byte_string(&self) -> ByteString {
395        if let Some(ref server_certificate) = self.server_certificate {
396            server_certificate.as_byte_string()
397        } else {
398            ByteString::null()
399        }
400    }
401
402    pub fn registered_server(&self) -> RegisteredServer {
403        let server_uri = self.application_uri.clone();
404        let product_uri = self.product_uri.clone();
405        let gateway_server_uri = self.gateway_server_uri();
406        let discovery_urls = self.discovery_urls();
407        let server_type = self.application_type();
408        let is_online = self.is_running();
409        let server_names = Some(vec![self.application_name.clone()]);
410        // Server names
411        RegisteredServer {
412            server_uri,
413            product_uri,
414            server_names,
415            server_type,
416            gateway_server_uri,
417            discovery_urls,
418            semaphore_file_path: UAString::null(),
419            is_online,
420        }
421    }
422
423    pub fn create_subscription_id(&mut self) -> u32 {
424        self.last_subscription_id += 1;
425        self.last_subscription_id
426    }
427
428    /// Authenticates access to an endpoint. The endpoint is described by its path, policy, mode and
429    /// the token is supplied in an extension object that must be extracted and authenticated.
430    ///
431    /// It is possible that the endpoint does not exist, or that the token is invalid / unsupported
432    /// or that the token cannot be used with the end point. The return codes reflect the responses
433    /// that ActivateSession would expect from a service call.
434    pub fn authenticate_endpoint(
435        &self,
436        request: &ActivateSessionRequest,
437        endpoint_url: &str,
438        security_policy: SecurityPolicy,
439        security_mode: MessageSecurityMode,
440        user_identity_token: &ExtensionObject,
441        server_nonce: &ByteString,
442    ) -> Result<String, StatusCode> {
443        // Get security from endpoint url
444        let config = trace_read_lock!(self.config);
445
446        if let Some(endpoint) = config.find_endpoint(endpoint_url, security_policy, security_mode) {
447            // Now validate the user identity token
448            match IdentityToken::new(user_identity_token, &self.decoding_options()) {
449                IdentityToken::None => {
450                    error!("User identity token type unsupported");
451                    Err(StatusCode::BadIdentityTokenInvalid)
452                }
453                IdentityToken::AnonymousIdentityToken(token) => {
454                    Self::authenticate_anonymous_token(endpoint, &token)
455                }
456                IdentityToken::UserNameIdentityToken(token) => self
457                    .authenticate_username_identity_token(
458                        &config,
459                        endpoint,
460                        &token,
461                        &self.server_pkey,
462                        server_nonce,
463                    ),
464                IdentityToken::X509IdentityToken(token) => self.authenticate_x509_identity_token(
465                    &config,
466                    endpoint,
467                    &token,
468                    &request.user_token_signature,
469                    &self.server_certificate,
470                    server_nonce,
471                ),
472                IdentityToken::Invalid(o) => {
473                    error!("User identity token type {:?} is unsupported", o.node_id);
474                    Err(StatusCode::BadIdentityTokenInvalid)
475                }
476            }
477        } else {
478            error!("Cannot find endpoint that matches path \"{}\", security policy {:?}, and security mode {:?}", endpoint_url, security_policy, security_mode);
479            Err(StatusCode::BadTcpEndpointUrlInvalid)
480        }
481    }
482
483    pub fn set_register_nodes_callbacks(
484        &mut self,
485        register_nodes_callback: Box<dyn RegisterNodes + Send + Sync>,
486        unregister_nodes_callback: Box<dyn UnregisterNodes + Send + Sync>,
487    ) {
488        self.register_nodes_callback = Some(register_nodes_callback);
489        self.unregister_nodes_callback = Some(unregister_nodes_callback);
490    }
491
492    /// Returns the decoding options of the server
493    pub fn decoding_options(&self) -> DecodingOptions {
494        let config = trace_read_lock!(self.config);
495        config.decoding_options()
496    }
497
498    /// Authenticates an anonymous token, i.e. does the endpoint support anonymous access or not
499    fn authenticate_anonymous_token(
500        endpoint: &ServerEndpoint,
501        token: &AnonymousIdentityToken,
502    ) -> Result<String, StatusCode> {
503        if token.policy_id.as_ref() != POLICY_ID_ANONYMOUS {
504            error!("Token doesn't possess the correct policy id");
505            Err(StatusCode::BadIdentityTokenInvalid)
506        } else if !endpoint.supports_anonymous() {
507            error!(
508                "Endpoint \"{}\" does not support anonymous authentication",
509                endpoint.path
510            );
511            Err(StatusCode::BadIdentityTokenRejected)
512        } else {
513            debug!("Anonymous identity is authenticated");
514            Ok(String::from(crate::server::config::ANONYMOUS_USER_TOKEN_ID))
515        }
516    }
517
518    /// Authenticates the username identity token with the supplied endpoint. The function returns the user token identifier
519    /// that matches the identity token.
520    fn authenticate_username_identity_token(
521        &self,
522        config: &ServerConfig,
523        endpoint: &ServerEndpoint,
524        token: &UserNameIdentityToken,
525        server_key: &Option<PrivateKey>,
526        server_nonce: &ByteString,
527    ) -> Result<String, StatusCode> {
528        if !endpoint.supports_user_pass(&config.user_tokens) {
529            error!("Endpoint doesn't support username password tokens");
530            Err(StatusCode::BadIdentityTokenRejected)
531        } else if token.policy_id != Self::user_pass_security_policy_id(endpoint) {
532            error!("Token doesn't possess the correct policy id");
533            Err(StatusCode::BadIdentityTokenInvalid)
534        } else if token.user_name.is_null() {
535            error!("User identify token supplies no user name");
536            Err(StatusCode::BadIdentityTokenInvalid)
537        } else {
538            debug!(
539                "policy id = {}, encryption algorithm = {}",
540                token.policy_id.as_ref(),
541                token.encryption_algorithm.as_ref()
542            );
543            let token_password = if !token.encryption_algorithm.is_null() {
544                if let Some(ref server_key) = server_key {
545                    user_identity::decrypt_user_identity_token_password(
546                        token,
547                        server_nonce.as_ref(),
548                        server_key,
549                    )?
550                } else {
551                    error!("Identity token password is encrypted but no server private key was supplied");
552                    return Err(StatusCode::BadIdentityTokenInvalid);
553                }
554            } else {
555                token.plaintext_password()?
556            };
557
558            // Iterate ids in endpoint
559            for user_token_id in &endpoint.user_token_ids {
560                if let Some(server_user_token) = config.user_tokens.get(user_token_id) {
561                    if server_user_token.is_user_pass()
562                        && server_user_token.user == token.user_name.as_ref()
563                    {
564                        // test for empty password
565                        let valid = if server_user_token.pass.is_none() {
566                            // Empty password for user
567                            token_password.is_empty()
568                        } else {
569                            // Password compared as UTF-8 bytes
570                            let server_password =
571                                server_user_token.pass.as_ref().unwrap().as_bytes();
572                            server_password == token_password.as_bytes()
573                        };
574                        if !valid {
575                            error!(
576                                "Cannot authenticate \"{}\", password is invalid",
577                                server_user_token.user
578                            );
579                            return Err(StatusCode::BadUserAccessDenied);
580                        } else {
581                            return Ok(user_token_id.clone());
582                        }
583                    }
584                }
585            }
586            error!(
587                "Cannot authenticate \"{}\", user not found for endpoint",
588                token.user_name
589            );
590            Err(StatusCode::BadUserAccessDenied)
591        }
592    }
593
594    /// Authenticate the x509 token against the endpoint. The function returns the user token identifier
595    /// that matches the identity token.
596    fn authenticate_x509_identity_token(
597        &self,
598        config: &ServerConfig,
599        endpoint: &ServerEndpoint,
600        token: &X509IdentityToken,
601        user_token_signature: &SignatureData,
602        server_certificate: &Option<X509>,
603        server_nonce: &ByteString,
604    ) -> Result<String, StatusCode> {
605        if !endpoint.supports_x509(&config.user_tokens) {
606            error!("Endpoint doesn't support x509 tokens");
607            Err(StatusCode::BadIdentityTokenRejected)
608        } else if token.policy_id.as_ref() != POLICY_ID_X509 {
609            error!("Token doesn't possess the correct policy id");
610            Err(StatusCode::BadIdentityTokenRejected)
611        } else {
612            let result = match server_certificate {
613                Some(ref server_certificate) => {
614                    // Find the security policy used for verifying tokens
615                    let user_identity_tokens = self.user_identity_tokens(config, endpoint);
616                    let security_policy = user_identity_tokens
617                        .iter()
618                        .find(|t| t.token_type == UserTokenType::Certificate)
619                        .map(|t| SecurityPolicy::from_uri(t.security_policy_uri.as_ref()))
620                        .unwrap_or_else(|| endpoint.security_policy());
621
622                    // The security policy has to be something that can encrypt
623                    match security_policy {
624                        SecurityPolicy::Unknown | SecurityPolicy::None => {
625                            Err(StatusCode::BadIdentityTokenInvalid)
626                        }
627                        security_policy => {
628                            // Verify token
629                            user_identity::verify_x509_identity_token(
630                                token,
631                                user_token_signature,
632                                security_policy,
633                                server_certificate,
634                                server_nonce.as_ref(),
635                            )
636                        }
637                    }
638                }
639                None => Err(StatusCode::BadIdentityTokenInvalid),
640            };
641            result.and_then(|_| {
642                // Check the endpoint to see if this token is supported
643                let signing_cert = X509::from_byte_string(&token.certificate_data)?;
644                let signing_thumbprint = signing_cert.thumbprint();
645                for user_token_id in &endpoint.user_token_ids {
646                    if let Some(server_user_token) = config.user_tokens.get(user_token_id) {
647                        if let Some(ref user_thumbprint) = server_user_token.thumbprint {
648                            // The signing cert matches a user's identity, so it is valid
649                            if *user_thumbprint == signing_thumbprint {
650                                return Ok(user_token_id.clone());
651                            }
652                        }
653                    }
654                }
655                Err(StatusCode::BadIdentityTokenInvalid)
656            })
657        }
658    }
659
660    pub fn set_historical_data_provider(
661        &mut self,
662        historical_data_provider: Box<dyn HistoricalDataProvider + Send + Sync>,
663    ) {
664        self.historical_data_provider = Some(historical_data_provider);
665    }
666
667    pub fn set_historical_event_provider(
668        &mut self,
669        historical_event_provider: Box<dyn HistoricalEventProvider + Send + Sync>,
670    ) {
671        self.historical_event_provider = Some(historical_event_provider);
672    }
673
674    pub(crate) fn raise_and_log<T>(&self, event: T) -> Result<NodeId, ()>
675    where
676        T: AuditEvent + Event,
677    {
678        let audit_log = trace_write_lock!(self.audit_log);
679        audit_log.raise_and_log(event)
680    }
681}