opcua_server/
state.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4
5//! Provides server state information, such as status, configuration, running servers and so on.
6
7use std::sync::{Arc, RwLock};
8
9use opcua_core::prelude::*;
10use opcua_crypto::{user_identity, PrivateKey, SecurityPolicy, X509};
11use opcua_types::{
12    profiles,
13    service_types::{
14        ActivateSessionRequest, AnonymousIdentityToken, ApplicationDescription, ApplicationType,
15        EndpointDescription, RegisteredServer, ServerState as ServerStateType, SignatureData,
16        UserNameIdentityToken, UserTokenPolicy, UserTokenType, X509IdentityToken,
17    },
18    status_code::StatusCode,
19};
20
21use crate::{
22    callbacks::{RegisterNodes, UnregisterNodes},
23    config::{ServerConfig, ServerEndpoint},
24    constants,
25    diagnostics::ServerDiagnostics,
26    events::{
27        audit::{AuditEvent, AuditLog},
28        event::Event,
29    },
30    historical::{HistoricalDataProvider, HistoricalEventProvider},
31    identity_token::{
32        IdentityToken, POLICY_ID_ANONYMOUS, POLICY_ID_USER_PASS_NONE, POLICY_ID_USER_PASS_RSA_15,
33        POLICY_ID_USER_PASS_RSA_OAEP, POLICY_ID_X509,
34    },
35};
36
37pub(crate) struct OperationalLimits {
38    pub max_nodes_per_translate_browse_paths_to_node_ids: usize,
39    pub max_nodes_per_read: usize,
40    pub max_nodes_per_write: usize,
41    pub max_nodes_per_method_call: usize,
42    pub max_nodes_per_browse: usize,
43    pub max_nodes_per_register_nodes: usize,
44    pub max_nodes_per_node_management: usize,
45    pub max_monitored_items_per_call: usize,
46    pub max_nodes_per_history_read_data: usize,
47    pub max_nodes_per_history_read_events: usize,
48    pub max_nodes_per_history_update_data: usize,
49    pub max_nodes_per_history_update_events: usize,
50}
51
52impl Default for OperationalLimits {
53    fn default() -> Self {
54        Self {
55            max_nodes_per_translate_browse_paths_to_node_ids:
56                constants::MAX_NODES_PER_TRANSLATE_BROWSE_PATHS_TO_NODE_IDS,
57            max_nodes_per_read: constants::MAX_NODES_PER_READ,
58            max_nodes_per_write: constants::MAX_NODES_PER_WRITE,
59            max_nodes_per_method_call: constants::MAX_NODES_PER_METHOD_CALL,
60            max_nodes_per_browse: constants::MAX_NODES_PER_BROWSE,
61            max_nodes_per_register_nodes: constants::MAX_NODES_PER_REGISTER_NODES,
62            max_nodes_per_node_management: constants::MAX_NODES_PER_NODE_MANAGEMENT,
63            max_monitored_items_per_call: constants::MAX_MONITORED_ITEMS_PER_CALL,
64            max_nodes_per_history_read_data: constants::MAX_NODES_PER_HISTORY_READ_DATA,
65            max_nodes_per_history_read_events: constants::MAX_NODES_PER_HISTORY_READ_EVENTS,
66            max_nodes_per_history_update_data: constants::MAX_NODES_PER_HISTORY_UPDATE_DATA,
67            max_nodes_per_history_update_events: constants::MAX_NODES_PER_HISTORY_UPDATE_EVENTS,
68        }
69    }
70}
71
72/// Server state is any state associated with the server as a whole that individual sessions might
73/// be interested in. That includes configuration info etc.
74pub struct ServerState {
75    /// The application URI
76    pub application_uri: UAString,
77    /// The product URI
78    pub product_uri: UAString,
79    /// The application name
80    pub application_name: LocalizedText,
81    /// The protocol, hostname and port formatted as a url, but less the path
82    pub base_endpoint: String,
83    /// The time the server started
84    pub start_time: DateTime,
85    /// The list of servers (by urn)
86    pub servers: Vec<String>,
87    /// Server configuration
88    pub config: Arc<RwLock<ServerConfig>>,
89    /// Server public certificate read from config location or null if there is none
90    pub server_certificate: Option<X509>,
91    /// Server private key
92    pub server_pkey: Option<PrivateKey>,
93    /// The next subscription id - subscriptions are shared across the whole server. Initial value
94    /// is a random u32.
95    pub last_subscription_id: u32,
96    /// Maximum number of subscriptions per session, 0 means no limit (danger)
97    pub max_subscriptions: usize,
98    /// Maximum number of monitored items per subscription, 0 means no limit (danger)
99    pub max_monitored_items_per_sub: usize,
100    /// Maximum number of queued values in a monitored item, 0 means no limit (danger)
101    pub max_monitored_item_queue_size: usize,
102    /// Minimum publishing interval (in millis)
103    pub min_publishing_interval_ms: Duration,
104    /// Minimum sampling interval (in millis)
105    pub min_sampling_interval_ms: Duration,
106    /// Default keep alive count
107    pub default_keep_alive_count: u32,
108    /// Maxmimum keep alive count
109    pub max_keep_alive_count: u32,
110    /// Maximum lifetime count (3 times as large as max keep alive)
111    pub max_lifetime_count: u32,
112    /// Operational limits
113    pub(crate) operational_limits: OperationalLimits,
114    //// Current state
115    pub state: ServerStateType,
116    /// Sets the abort flag that terminates the associated server
117    pub abort: bool,
118    /// Audit log
119    pub(crate) audit_log: Arc<RwLock<AuditLog>>,
120    /// Diagnostic information
121    pub(crate) diagnostics: Arc<RwLock<ServerDiagnostics>>,
122    /// Callback for register nodes
123    pub(crate) register_nodes_callback: Option<Box<dyn RegisterNodes + Send + Sync>>,
124    /// Callback for unregister nodes
125    pub(crate) unregister_nodes_callback: Option<Box<dyn UnregisterNodes + Send + Sync>>,
126    /// Callback for historical data
127    pub(crate) historical_data_provider: Option<Box<dyn HistoricalDataProvider + Send + Sync>>,
128    /// Callback for historical events
129    pub(crate) historical_event_provider: Option<Box<dyn HistoricalEventProvider + Send + Sync>>,
130}
131
132impl ServerState {
133    pub fn endpoints(
134        &self,
135        endpoint_url: &UAString,
136        transport_profile_uris: &Option<Vec<UAString>>,
137    ) -> Option<Vec<EndpointDescription>> {
138        // Filter endpoints based on profile_uris
139        debug!(
140            "Endpoints requested, transport profile uris {:?}",
141            transport_profile_uris
142        );
143        if let Some(ref transport_profile_uris) = *transport_profile_uris {
144            // Note - some clients pass an empty array
145            if !transport_profile_uris.is_empty() {
146                // As we only support binary transport, the result is None if the supplied profile_uris does not contain that profile
147                let found_binary_transport = transport_profile_uris.iter().any(|profile_uri| {
148                    profile_uri.as_ref() == profiles::TRANSPORT_PROFILE_URI_BINARY
149                });
150                if !found_binary_transport {
151                    error!(
152                        "Client wants to connect with a non binary transport {:#?}",
153                        transport_profile_uris
154                    );
155                    return None;
156                }
157            }
158        }
159
160        let config = trace_read_lock!(self.config);
161        if let Ok(hostname) = hostname_from_url(endpoint_url.as_ref()) {
162            if !hostname.eq_ignore_ascii_case(&config.tcp_config.host) {
163                debug!("Endpoint url \"{}\" hostname supplied by caller does not match server's hostname \"{}\"", endpoint_url, &config.tcp_config.host);
164            }
165            let endpoints = config
166                .endpoints
167                .iter()
168                .map(|(_, e)| self.new_endpoint_description(&config, e, true))
169                .collect();
170            Some(endpoints)
171        } else {
172            warn!(
173                "Endpoint url \"{}\" is unrecognized, using default",
174                endpoint_url
175            );
176            if let Some(e) = config.default_endpoint() {
177                Some(vec![self.new_endpoint_description(&config, e, true)])
178            } else {
179                Some(vec![])
180            }
181        }
182    }
183
184    pub fn endpoint_exists(
185        &self,
186        endpoint_url: &str,
187        security_policy: SecurityPolicy,
188        security_mode: MessageSecurityMode,
189    ) -> bool {
190        let config = trace_read_lock!(self.config);
191        config
192            .find_endpoint(endpoint_url, security_policy, security_mode)
193            .is_some()
194    }
195
196    /// Make matching endpoint descriptions for the specified url.
197    /// If none match then None will be passed, therefore if Some is returned it will be guaranteed
198    /// to contain at least one result.
199    pub fn new_endpoint_descriptions(
200        &self,
201        endpoint_url: &str,
202    ) -> Option<Vec<EndpointDescription>> {
203        debug!("find_endpoint, url = {}", endpoint_url);
204        let config = trace_read_lock!(self.config);
205        let base_endpoint_url = config.base_endpoint_url();
206        let endpoints: Vec<EndpointDescription> = config
207            .endpoints
208            .iter()
209            .filter(|&(_, e)| {
210                // Test end point's security_policy_uri and matching url
211                url_matches_except_host(&e.endpoint_url(&base_endpoint_url), endpoint_url)
212            })
213            .map(|(_, e)| self.new_endpoint_description(&config, e, false))
214            .collect();
215        if endpoints.is_empty() {
216            None
217        } else {
218            Some(endpoints)
219        }
220    }
221
222    /// Determine what user/pass encryption to use depending on the security policy.
223    fn user_pass_security_policy_id(endpoint: &ServerEndpoint) -> UAString {
224        match endpoint.password_security_policy() {
225            SecurityPolicy::None => POLICY_ID_USER_PASS_NONE,
226            SecurityPolicy::Basic128Rsa15 => POLICY_ID_USER_PASS_RSA_15,
227            SecurityPolicy::Basic256 | SecurityPolicy::Basic256Sha256 => {
228                POLICY_ID_USER_PASS_RSA_OAEP
229            }
230            // TODO this is a placeholder
231            SecurityPolicy::Aes128Sha256RsaOaep | SecurityPolicy::Aes256Sha256RsaPss => {
232                POLICY_ID_USER_PASS_RSA_OAEP
233            }
234            _ => {
235                panic!()
236            }
237        }
238        .into()
239    }
240
241    fn user_pass_security_policy_uri(_endpoint: &ServerEndpoint) -> UAString {
242        // TODO we could force the security policy uri for passwords to be something other than the default
243        //  here to ensure they're secure even when the endpoint's security policy is None.
244        UAString::null()
245    }
246
247    fn user_identity_tokens(
248        &self,
249        config: &ServerConfig,
250        endpoint: &ServerEndpoint,
251    ) -> Vec<UserTokenPolicy> {
252        let mut user_identity_tokens = Vec::with_capacity(3);
253
254        // Anonymous policy
255        if endpoint.supports_anonymous() {
256            user_identity_tokens.push(UserTokenPolicy {
257                policy_id: UAString::from(POLICY_ID_ANONYMOUS),
258                token_type: UserTokenType::Anonymous,
259                issued_token_type: UAString::null(),
260                issuer_endpoint_url: UAString::null(),
261                security_policy_uri: UAString::null(),
262            });
263        }
264        // User pass policy
265        if endpoint.supports_user_pass(&config.user_tokens) {
266            // The endpoint may set a password security policy
267            user_identity_tokens.push(UserTokenPolicy {
268                policy_id: Self::user_pass_security_policy_id(endpoint),
269                token_type: UserTokenType::UserName,
270                issued_token_type: UAString::null(),
271                issuer_endpoint_url: UAString::null(),
272                security_policy_uri: Self::user_pass_security_policy_uri(endpoint),
273            });
274        }
275        // X509 policy
276        if endpoint.supports_x509(&config.user_tokens) {
277            user_identity_tokens.push(UserTokenPolicy {
278                policy_id: UAString::from(POLICY_ID_X509),
279                token_type: UserTokenType::Certificate,
280                issued_token_type: UAString::null(),
281                issuer_endpoint_url: UAString::null(),
282                security_policy_uri: UAString::from(SecurityPolicy::Basic128Rsa15.to_uri()),
283            });
284        }
285
286        if user_identity_tokens.is_empty() {
287            debug!(
288                "user_identity_tokens() returned zero endpoints for endpoint {} / {} {}",
289                endpoint.path, endpoint.security_policy, endpoint.security_mode
290            );
291        }
292
293        user_identity_tokens
294    }
295
296    /// Constructs a new endpoint description using the server's info and that in an Endpoint
297    fn new_endpoint_description(
298        &self,
299        config: &ServerConfig,
300        endpoint: &ServerEndpoint,
301        all_fields: bool,
302    ) -> EndpointDescription {
303        let base_endpoint_url = config.base_endpoint_url();
304
305        let user_identity_tokens = self.user_identity_tokens(config, endpoint);
306
307        // CreateSession doesn't need all the endpoint description
308        // and docs say not to bother sending the server and server
309        // certificate info.
310        let (server, server_certificate) = if all_fields {
311            (
312                ApplicationDescription {
313                    application_uri: self.application_uri.clone(),
314                    product_uri: self.product_uri.clone(),
315                    application_name: self.application_name.clone(),
316                    application_type: self.application_type(),
317                    gateway_server_uri: self.gateway_server_uri(),
318                    discovery_profile_uri: UAString::null(),
319                    discovery_urls: self.discovery_urls(),
320                },
321                self.server_certificate_as_byte_string(),
322            )
323        } else {
324            (
325                ApplicationDescription {
326                    application_uri: UAString::null(),
327                    product_uri: UAString::null(),
328                    application_name: LocalizedText::null(),
329                    application_type: self.application_type(),
330                    gateway_server_uri: self.gateway_server_uri(),
331                    discovery_profile_uri: UAString::null(),
332                    discovery_urls: self.discovery_urls(),
333                },
334                ByteString::null(),
335            )
336        };
337
338        EndpointDescription {
339            endpoint_url: endpoint.endpoint_url(&base_endpoint_url).into(),
340            server,
341            server_certificate,
342            security_mode: endpoint.message_security_mode(),
343            security_policy_uri: UAString::from(endpoint.security_policy().to_uri()),
344            user_identity_tokens: Some(user_identity_tokens),
345            transport_profile_uri: UAString::from(profiles::TRANSPORT_PROFILE_URI_BINARY),
346            security_level: endpoint.security_level,
347        }
348    }
349
350    pub fn discovery_urls(&self) -> Option<Vec<UAString>> {
351        let config = trace_read_lock!(self.config);
352        if config.discovery_urls.is_empty() {
353            None
354        } else {
355            Some(config.discovery_urls.iter().map(UAString::from).collect())
356        }
357    }
358
359    pub fn application_type(&self) -> ApplicationType {
360        ApplicationType::Server
361    }
362
363    pub fn gateway_server_uri(&self) -> UAString {
364        UAString::null()
365    }
366
367    pub fn abort(&mut self) {
368        info!("Server has been told to abort");
369        self.abort = true;
370        self.state = ServerStateType::Shutdown;
371    }
372
373    pub fn state(&self) -> ServerStateType {
374        self.state
375    }
376
377    pub fn set_state(&mut self, state: ServerStateType) {
378        self.state = state;
379    }
380
381    pub fn is_abort(&self) -> bool {
382        self.abort
383    }
384
385    pub fn is_running(&self) -> bool {
386        self.state == ServerStateType::Running
387    }
388
389    pub fn server_certificate_as_byte_string(&self) -> ByteString {
390        if let Some(ref server_certificate) = self.server_certificate {
391            server_certificate.as_byte_string()
392        } else {
393            ByteString::null()
394        }
395    }
396
397    pub fn registered_server(&self) -> RegisteredServer {
398        let server_uri = self.application_uri.clone();
399        let product_uri = self.product_uri.clone();
400        let gateway_server_uri = self.gateway_server_uri();
401        let discovery_urls = self.discovery_urls();
402        let server_type = self.application_type();
403        let is_online = self.is_running();
404        let server_names = Some(vec![self.application_name.clone()]);
405        // Server names
406        RegisteredServer {
407            server_uri,
408            product_uri,
409            server_names,
410            server_type,
411            gateway_server_uri,
412            discovery_urls,
413            semaphore_file_path: UAString::null(),
414            is_online,
415        }
416    }
417
418    pub fn create_subscription_id(&mut self) -> u32 {
419        self.last_subscription_id += 1;
420        self.last_subscription_id
421    }
422
423    /// Authenticates access to an endpoint. The endpoint is described by its path, policy, mode and
424    /// the token is supplied in an extension object that must be extracted and authenticated.
425    ///
426    /// It is possible that the endpoint does not exist, or that the token is invalid / unsupported
427    /// or that the token cannot be used with the end point. The return codes reflect the responses
428    /// that ActivateSession would expect from a service call.
429    pub fn authenticate_endpoint(
430        &self,
431        request: &ActivateSessionRequest,
432        endpoint_url: &str,
433        security_policy: SecurityPolicy,
434        security_mode: MessageSecurityMode,
435        user_identity_token: &ExtensionObject,
436        server_nonce: &ByteString,
437    ) -> Result<String, StatusCode> {
438        // Get security from endpoint url
439        let config = trace_read_lock!(self.config);
440
441        if let Some(endpoint) = config.find_endpoint(endpoint_url, security_policy, security_mode) {
442            // Now validate the user identity token
443            match IdentityToken::new(user_identity_token, &self.decoding_options()) {
444                IdentityToken::None => {
445                    error!("User identity token type unsupported");
446                    Err(StatusCode::BadIdentityTokenInvalid)
447                }
448                IdentityToken::AnonymousIdentityToken(token) => {
449                    Self::authenticate_anonymous_token(endpoint, &token)
450                }
451                IdentityToken::UserNameIdentityToken(token) => self
452                    .authenticate_username_identity_token(
453                        &config,
454                        endpoint,
455                        &token,
456                        &self.server_pkey,
457                        server_nonce,
458                    ),
459                IdentityToken::X509IdentityToken(token) => self.authenticate_x509_identity_token(
460                    &config,
461                    endpoint,
462                    &token,
463                    &request.user_token_signature,
464                    &self.server_certificate,
465                    server_nonce,
466                ),
467                IdentityToken::Invalid(o) => {
468                    error!("User identity token type {:?} is unsupported", o.node_id);
469                    Err(StatusCode::BadIdentityTokenInvalid)
470                }
471            }
472        } else {
473            error!("Cannot find endpoint that matches path \"{}\", security policy {:?}, and security mode {:?}", endpoint_url, security_policy, security_mode);
474            Err(StatusCode::BadTcpEndpointUrlInvalid)
475        }
476    }
477
478    pub fn set_register_nodes_callbacks(
479        &mut self,
480        register_nodes_callback: Box<dyn RegisterNodes + Send + Sync>,
481        unregister_nodes_callback: Box<dyn UnregisterNodes + Send + Sync>,
482    ) {
483        self.register_nodes_callback = Some(register_nodes_callback);
484        self.unregister_nodes_callback = Some(unregister_nodes_callback);
485    }
486
487    /// Returns the decoding options of the server
488    pub fn decoding_options(&self) -> DecodingOptions {
489        let config = trace_read_lock!(self.config);
490        config.decoding_options()
491    }
492
493    /// Authenticates an anonymous token, i.e. does the endpoint support anonymous access or not
494    fn authenticate_anonymous_token(
495        endpoint: &ServerEndpoint,
496        token: &AnonymousIdentityToken,
497    ) -> Result<String, StatusCode> {
498        if token.policy_id.as_ref() != POLICY_ID_ANONYMOUS {
499            error!("Token doesn't possess the correct policy id");
500            Err(StatusCode::BadIdentityTokenInvalid)
501        } else if !endpoint.supports_anonymous() {
502            error!(
503                "Endpoint \"{}\" does not support anonymous authentication",
504                endpoint.path
505            );
506            Err(StatusCode::BadIdentityTokenRejected)
507        } else {
508            debug!("Anonymous identity is authenticated");
509            Ok(String::from(crate::config::ANONYMOUS_USER_TOKEN_ID))
510        }
511    }
512
513    /// Authenticates the username identity token with the supplied endpoint. The function returns the user token identifier
514    /// that matches the identity token.
515    fn authenticate_username_identity_token(
516        &self,
517        config: &ServerConfig,
518        endpoint: &ServerEndpoint,
519        token: &UserNameIdentityToken,
520        server_key: &Option<PrivateKey>,
521        server_nonce: &ByteString,
522    ) -> Result<String, StatusCode> {
523        if !endpoint.supports_user_pass(&config.user_tokens) {
524            error!("Endpoint doesn't support username password tokens");
525            Err(StatusCode::BadIdentityTokenRejected)
526        } else if token.policy_id != Self::user_pass_security_policy_id(endpoint) {
527            error!("Token doesn't possess the correct policy id");
528            Err(StatusCode::BadIdentityTokenInvalid)
529        } else if token.user_name.is_null() {
530            error!("User identify token supplies no user name");
531            Err(StatusCode::BadIdentityTokenInvalid)
532        } else {
533            debug!(
534                "policy id = {}, encryption algorithm = {}",
535                token.policy_id.as_ref(),
536                token.encryption_algorithm.as_ref()
537            );
538            let token_password = if !token.encryption_algorithm.is_null() {
539                if let Some(ref server_key) = server_key {
540                    user_identity::decrypt_user_identity_token_password(
541                        token,
542                        server_nonce.as_ref(),
543                        server_key,
544                    )?
545                } else {
546                    error!("Identity token password is encrypted but no server private key was supplied");
547                    return Err(StatusCode::BadIdentityTokenInvalid);
548                }
549            } else {
550                token.plaintext_password()?
551            };
552
553            // Iterate ids in endpoint
554            for user_token_id in &endpoint.user_token_ids {
555                if let Some(server_user_token) = config.user_tokens.get(user_token_id) {
556                    if server_user_token.is_user_pass()
557                        && server_user_token.user == token.user_name.as_ref()
558                    {
559                        // test for empty password
560                        let valid = if server_user_token.pass.is_none() {
561                            // Empty password for user
562                            token_password.is_empty()
563                        } else {
564                            // Password compared as UTF-8 bytes
565                            let server_password =
566                                server_user_token.pass.as_ref().unwrap().as_bytes();
567                            server_password == token_password.as_bytes()
568                        };
569                        if !valid {
570                            error!(
571                                "Cannot authenticate \"{}\", password is invalid",
572                                server_user_token.user
573                            );
574                            return Err(StatusCode::BadUserAccessDenied);
575                        } else {
576                            return Ok(user_token_id.clone());
577                        }
578                    }
579                }
580            }
581            error!(
582                "Cannot authenticate \"{}\", user not found for endpoint",
583                token.user_name
584            );
585            Err(StatusCode::BadUserAccessDenied)
586        }
587    }
588
589    /// Authenticate the x509 token against the endpoint. The function returns the user token identifier
590    /// that matches the identity token.
591    fn authenticate_x509_identity_token(
592        &self,
593        config: &ServerConfig,
594        endpoint: &ServerEndpoint,
595        token: &X509IdentityToken,
596        user_token_signature: &SignatureData,
597        server_certificate: &Option<X509>,
598        server_nonce: &ByteString,
599    ) -> Result<String, StatusCode> {
600        if !endpoint.supports_x509(&config.user_tokens) {
601            error!("Endpoint doesn't support x509 tokens");
602            Err(StatusCode::BadIdentityTokenRejected)
603        } else if token.policy_id.as_ref() != POLICY_ID_X509 {
604            error!("Token doesn't possess the correct policy id");
605            Err(StatusCode::BadIdentityTokenRejected)
606        } else {
607            let result = match server_certificate {
608                Some(ref server_certificate) => {
609                    // Find the security policy used for verifying tokens
610                    let user_identity_tokens = self.user_identity_tokens(config, endpoint);
611                    let security_policy = user_identity_tokens
612                        .iter()
613                        .find(|t| t.token_type == UserTokenType::Certificate)
614                        .map(|t| SecurityPolicy::from_uri(t.security_policy_uri.as_ref()))
615                        .unwrap_or_else(|| endpoint.security_policy());
616
617                    // The security policy has to be something that can encrypt
618                    match security_policy {
619                        SecurityPolicy::Unknown | SecurityPolicy::None => {
620                            Err(StatusCode::BadIdentityTokenInvalid)
621                        }
622                        security_policy => {
623                            // Verify token
624                            user_identity::verify_x509_identity_token(
625                                token,
626                                user_token_signature,
627                                security_policy,
628                                server_certificate,
629                                server_nonce.as_ref(),
630                            )
631                        }
632                    }
633                }
634                None => Err(StatusCode::BadIdentityTokenInvalid),
635            };
636            result.and_then(|_| {
637                // Check the endpoint to see if this token is supported
638                let signing_cert = X509::from_byte_string(&token.certificate_data)?;
639                let signing_thumbprint = signing_cert.thumbprint();
640                for user_token_id in &endpoint.user_token_ids {
641                    if let Some(server_user_token) = config.user_tokens.get(user_token_id) {
642                        if let Some(ref user_thumbprint) = server_user_token.thumbprint {
643                            // The signing cert matches a user's identity, so it is valid
644                            if *user_thumbprint == signing_thumbprint {
645                                return Ok(user_token_id.clone());
646                            }
647                        }
648                    }
649                }
650                Err(StatusCode::BadIdentityTokenInvalid)
651            })
652        }
653    }
654
655    pub fn set_historical_data_provider(
656        &mut self,
657        historical_data_provider: Box<dyn HistoricalDataProvider + Send + Sync>,
658    ) {
659        self.historical_data_provider = Some(historical_data_provider);
660    }
661
662    pub fn set_historical_event_provider(
663        &mut self,
664        historical_event_provider: Box<dyn HistoricalEventProvider + Send + Sync>,
665    ) {
666        self.historical_event_provider = Some(historical_event_provider);
667    }
668
669    pub(crate) fn raise_and_log<T>(&self, event: T) -> Result<NodeId, ()>
670    where
671        T: AuditEvent + Event,
672    {
673        let audit_log = trace_write_lock!(self.audit_log);
674        audit_log.raise_and_log(event)
675    }
676}