Skip to main content

opcua/client/session/
session.rs

1// OPCUA for Rust
2// SPDX-License-Identifier: MPL-2.0
3// Copyright (C) 2017-2022 Adam Lock
4
5//! Session functionality for the current open client connection. This module contains functions
6//! to call for all typically synchronous operations during an OPC UA session.
7//!
8//! The session also has async functionality but that is reserved for publish requests on subscriptions
9//! and events.
10use std::{
11    cmp,
12    collections::HashSet,
13    result::Result,
14    str::FromStr,
15    sync::{mpsc::SyncSender, Arc},
16    thread,
17};
18
19use tokio::{
20    sync::oneshot,
21    time::{interval, Duration, Instant},
22};
23
24use crate::{
25    client::{
26        callbacks::{OnConnectionStatusChange, OnSessionClosed, OnSubscriptionNotification},
27        client::IdentityToken,
28        comms::tcp_transport::TcpTransport,
29        process_service_result, process_unexpected_response,
30        session::{
31            services::*,
32            session_debug, session_error,
33            session_state::{ConnectionState, SessionState},
34            session_trace, session_warn,
35        },
36        session_retry_policy::{Answer, SessionRetryPolicy},
37        subscription::{self, Subscription},
38        subscription_state::SubscriptionState,
39        trust_any_server_cert,
40    },
41    core::{
42        comms::{
43            secure_channel::{Role, SecureChannel},
44            url::*,
45        },
46        supported_message::SupportedMessage,
47        RUNTIME,
48    },
49    crypto::{
50        self as crypto, user_identity::make_user_name_identity_token, CertificateStore,
51        SecurityPolicy, X509,
52    },
53    deregister_runtime_component, register_runtime_component,
54    sync::*,
55    types::{node_ids::ObjectId, status_code::StatusCode, *},
56};
57
58/// Information about the server endpoint, security policy, security mode and user identity that the session will
59/// will use to establish a connection.
60#[derive(Debug)]
61pub struct SessionInfo {
62    /// The endpoint
63    pub endpoint: EndpointDescription,
64    /// User identity token
65    pub user_identity_token: IdentityToken,
66    /// Preferred language locales
67    pub preferred_locales: Vec<String>,
68}
69
70impl Into<SessionInfo> for EndpointDescription {
71    fn into(self) -> SessionInfo {
72        (self, IdentityToken::Anonymous).into()
73    }
74}
75
76impl Into<SessionInfo> for (EndpointDescription, IdentityToken) {
77    fn into(self) -> SessionInfo {
78        SessionInfo {
79            endpoint: self.0,
80            user_identity_token: self.1,
81            preferred_locales: Vec::new(),
82        }
83    }
84}
85
86/// A `Session` runs in a loop, which can be terminated by sending it a `SessionCommand`.
87#[derive(Debug)]
88pub enum SessionCommand {
89    /// Stop running as soon as possible
90    Stop,
91}
92
93/// A session of the client. The session is associated with an endpoint and maintains a state
94/// when it is active. The `Session` struct provides functions for all the supported
95/// request types in the API.
96///
97/// Note that not all servers may support all service requests and calling an unsupported API
98/// may cause the connection to be dropped. Your client is expected to know the capabilities of
99/// the server it is calling to avoid this.
100///
101pub struct Session {
102    /// The client application's name.
103    application_description: ApplicationDescription,
104    /// A name for the session, supplied during create
105    session_name: UAString,
106    /// The session connection info.
107    session_info: SessionInfo,
108    /// Runtime state of the session, reset if disconnected.
109    session_state: Arc<RwLock<SessionState>>,
110    /// Subscriptions state.
111    subscription_state: Arc<RwLock<SubscriptionState>>,
112    /// Transport layer.
113    transport: TcpTransport,
114    /// Certificate store.
115    certificate_store: Arc<RwLock<CertificateStore>>,
116    /// Secure channel information.
117    secure_channel: Arc<RwLock<SecureChannel>>,
118    /// Session retry policy.
119    session_retry_policy: Arc<Mutex<SessionRetryPolicy>>,
120    /// Ignore clock skew between the client and the server.
121    ignore_clock_skew: bool,
122    /// Single threaded executor flag (for TCP transport). Unused.
123    single_threaded_executor: bool,
124    /// Tokio runtime
125    runtime: Arc<Mutex<tokio::runtime::Runtime>>,
126}
127
128impl Drop for Session {
129    fn drop(&mut self) {
130        info!("Session has dropped");
131        self.disconnect();
132    }
133}
134
135impl Session {
136    /// Create a new session from the supplied application description, certificate store and session
137    /// information.
138    ///
139    /// # Arguments
140    ///
141    /// * `application_description` - information about the client that will be provided to the server
142    /// * `certificate_store` - certificate management on disk
143    /// * `session_info` - information required to establish a new session.
144    ///
145    /// # Returns
146    ///
147    /// * `Session` - the interface that shall be used to communicate between the client and the server.
148    ///
149    pub(crate) fn new<T>(
150        application_description: ApplicationDescription,
151        session_name: T,
152        certificate_store: Arc<RwLock<CertificateStore>>,
153        session_info: SessionInfo,
154        session_retry_policy: SessionRetryPolicy,
155        decoding_options: DecodingOptions,
156        ignore_clock_skew: bool,
157        single_threaded_executor: bool,
158    ) -> Session
159    where
160        T: Into<UAString>,
161    {
162        let session_name = session_name.into();
163
164        let secure_channel = Arc::new(RwLock::new(SecureChannel::new(
165            certificate_store.clone(),
166            Role::Client,
167            decoding_options,
168        )));
169
170        let subscription_state = Arc::new(RwLock::new(SubscriptionState::new()));
171
172        let session_state = Arc::new(RwLock::new(SessionState::new(
173            ignore_clock_skew,
174            secure_channel.clone(),
175            subscription_state.clone(),
176        )));
177
178        let transport = TcpTransport::new(
179            secure_channel.clone(),
180            session_state.clone(),
181            single_threaded_executor,
182        );
183
184        // This runtime is single threaded. The one for the transport may be multi-threaded
185        let runtime = tokio::runtime::Builder::new_current_thread()
186            .enable_all()
187            .build()
188            .unwrap();
189
190        Session {
191            application_description,
192            session_name,
193            session_info,
194            session_state,
195            certificate_store,
196            subscription_state,
197            transport,
198            secure_channel,
199            session_retry_policy: Arc::new(Mutex::new(session_retry_policy)),
200            ignore_clock_skew,
201            single_threaded_executor,
202            runtime: Arc::new(Mutex::new(runtime)),
203        }
204    }
205
206    fn reset(&mut self) {
207        // Clear the existing secure channel state
208        {
209            let mut secure_channel = trace_write_lock!(self.secure_channel);
210            secure_channel.clear_security_token();
211        }
212
213        // Create a new session state
214        self.session_state = Arc::new(RwLock::new(SessionState::new(
215            self.ignore_clock_skew,
216            self.secure_channel.clone(),
217            self.subscription_state.clone(),
218        )));
219
220        // Keep the existing transport, we should never drop a tokio runtime from a sync function
221    }
222
223    /// Connects to the server, creates and activates a session. If there
224    /// is a failure, it will be communicated by the status code in the result.
225    ///
226    /// # Returns
227    ///
228    /// * `Ok(())` - connection has happened and the session is activated
229    /// * `Err(StatusCode)` - reason for failure
230    ///
231    pub fn connect_and_activate(&mut self) -> Result<(), StatusCode> {
232        // Connect now using the session state
233        self.connect()?;
234        self.create_session()?;
235        self.activate_session()?;
236        Ok(())
237    }
238
239    /// Sets the session retry policy that dictates what this session will do if the connection
240    /// fails or goes down. The retry policy enables the session to retry a connection on an
241    /// interval up to a maxmimum number of times.
242    ///
243    /// # Arguments
244    ///
245    /// * `session_retry_policy` - the session retry policy to use
246    ///
247    pub fn set_session_retry_policy(&mut self, session_retry_policy: SessionRetryPolicy) {
248        self.session_retry_policy = Arc::new(Mutex::new(session_retry_policy));
249    }
250
251    /// Register a callback to be notified when the session has been closed.
252    ///
253    /// # Arguments
254    ///
255    /// * `session_closed_callback` - the session closed callback
256    ///
257    pub fn set_session_closed_callback<CB>(&mut self, session_closed_callback: CB)
258    where
259        CB: OnSessionClosed + Send + Sync + 'static,
260    {
261        let mut session_state = trace_write_lock!(self.session_state);
262        session_state.set_session_closed_callback(session_closed_callback);
263    }
264
265    /// Registers a callback to be notified when the session connection status has changed.
266    /// This will be called if connection status changes from connected to disconnected or vice versa.
267    ///
268    /// # Arguments
269    ///
270    /// * `connection_status_callback` - the connection status callback.
271    ///
272    pub fn set_connection_status_callback<CB>(&mut self, connection_status_callback: CB)
273    where
274        CB: OnConnectionStatusChange + Send + Sync + 'static,
275    {
276        let mut session_state = trace_write_lock!(self.session_state);
277        session_state.set_connection_status_callback(connection_status_callback);
278    }
279
280    /// Reconnects to the server and tries to activate the existing session. If there
281    /// is a failure, it will be communicated by the status code in the result. You should not
282    /// call this if there is a session retry policy associated with the session.
283    ///
284    /// Reconnecting will attempt to transfer or recreate subscriptions that were on the old
285    /// session before it terminated.
286    ///
287    /// # Returns
288    ///
289    /// * `Ok(())` - reconnection has happened and the session is activated
290    /// * `Err(StatusCode)` - reason for failure
291    ///
292    pub fn reconnect_and_activate(&mut self) -> Result<(), StatusCode> {
293        // Do nothing if already connected / activated
294        if self.is_connected() {
295            session_error!(
296                self,
297                "Reconnect is going to do nothing because already connected"
298            );
299            Err(StatusCode::BadUnexpectedError)
300        } else {
301            // Reset the session state
302            self.reset();
303
304            // Connect to server (again)
305            self.connect_no_retry()?;
306
307            // Attempt to reactivate the existing session
308            match self.activate_session() {
309                Err(status_code) => {
310                    // Activation didn't work, so create a new session
311                    info!("Session activation failed on reconnect, error = {}, so creating a new session", status_code);
312                    {
313                        let mut session_state = trace_write_lock!(self.session_state);
314                        session_state.reset();
315                    }
316
317                    session_debug!(self, "create_session");
318                    self.create_session()?;
319                    session_debug!(self, "activate_session");
320                    self.activate_session()?;
321                    session_debug!(self, "reconnect should be complete");
322                }
323                Ok(_) => {
324                    info!("Activation succeeded");
325                }
326            }
327            session_debug!(self, "transfer_subscriptions_from_old_session");
328            self.transfer_subscriptions_from_old_session()?;
329            Ok(())
330        }
331    }
332
333    /// This code attempts to take the existing subscriptions created by a previous session and
334    /// either transfer them to this session, or construct them from scratch.
335    fn transfer_subscriptions_from_old_session(&mut self) -> Result<(), StatusCode> {
336        let subscription_state = self.subscription_state.clone();
337
338        let subscription_ids = {
339            let subscription_state = trace_read_lock!(subscription_state);
340            subscription_state.subscription_ids()
341        };
342
343        // Start by getting the subscription ids
344        if let Some(subscription_ids) = subscription_ids {
345            // Try to use TransferSubscriptions to move subscriptions_ids over. If this
346            // works then there is nothing else to do.
347            let mut subscription_ids_to_recreate =
348                subscription_ids.iter().copied().collect::<HashSet<u32>>();
349            if let Ok(transfer_results) = self.transfer_subscriptions(&subscription_ids, true) {
350                session_debug!(self, "transfer_results = {:?}", transfer_results);
351                transfer_results.iter().enumerate().for_each(|(i, r)| {
352                    if r.status_code.is_good() {
353                        // Subscription was transferred so it does not need to be recreated
354                        subscription_ids_to_recreate.remove(&subscription_ids[i]);
355                    }
356                });
357            }
358
359            // But if it didn't work, then some or all subscriptions have to be remade.
360            if !subscription_ids_to_recreate.is_empty() {
361                session_warn!(self, "Some or all of the existing subscriptions could not be transferred and must be created manually");
362            }
363
364            // Now create any subscriptions that could not be transferred
365            subscription_ids_to_recreate
366                .iter()
367                .for_each(|subscription_id| {
368                    info!("Recreating subscription {}", subscription_id);
369                    // Remove the subscription data, create it again from scratch
370                    let deleted_subscription = {
371                        let mut subscription_state = trace_write_lock!(subscription_state);
372                        subscription_state.delete_subscription(*subscription_id)
373                    };
374
375                    if let Some(subscription) = deleted_subscription {
376                        // Attempt to replicate the subscription (subscription id will be new)
377                        if let Ok(subscription_id) = self.create_subscription_inner(
378                            subscription.publishing_interval(),
379                            subscription.lifetime_count(),
380                            subscription.max_keep_alive_count(),
381                            subscription.max_notifications_per_publish(),
382                            subscription.priority(),
383                            subscription.publishing_enabled(),
384                            subscription.notification_callback(),
385                        ) {
386                            info!("New subscription created with id {}", subscription_id);
387
388                            // For each monitored item
389                            let items_to_create = subscription
390                                .monitored_items()
391                                .iter()
392                                .map(|(_, item)| MonitoredItemCreateRequest {
393                                    item_to_monitor: item.item_to_monitor().clone(),
394                                    monitoring_mode: item.monitoring_mode(),
395                                    requested_parameters: MonitoringParameters {
396                                        client_handle: item.client_handle(),
397                                        sampling_interval: item.sampling_interval(),
398                                        filter: ExtensionObject::null(),
399                                        queue_size: item.queue_size() as u32,
400                                        discard_oldest: true,
401                                    },
402                                })
403                                .collect::<Vec<MonitoredItemCreateRequest>>();
404                            let _ = self.create_monitored_items(
405                                subscription_id,
406                                TimestampsToReturn::Both,
407                                &items_to_create,
408                            );
409
410                            // Recreate any triggers for the monitored item. This code assumes monitored item
411                            // ids are the same value as they were in the previous subscription.
412                            subscription.monitored_items().iter().for_each(|(_, item)| {
413                                let triggered_items = item.triggered_items();
414                                if !triggered_items.is_empty() {
415                                    let links_to_add =
416                                        triggered_items.iter().copied().collect::<Vec<u32>>();
417                                    let _ = self.set_triggering(
418                                        subscription_id,
419                                        item.id(),
420                                        links_to_add.as_slice(),
421                                        &[],
422                                    );
423                                }
424                            });
425                        } else {
426                            session_warn!(
427                                self,
428                                "Could not create a subscription from the existing subscription {}",
429                                subscription_id
430                            );
431                        }
432                    } else {
433                        panic!(
434                            "Subscription {}, doesn't exist although it should",
435                            subscription_id
436                        );
437                    }
438                });
439        }
440        Ok(())
441    }
442
443    /// Connects to the server using the retry policy to repeat connecting until such time as it
444    /// succeeds or the policy says to give up. If there is a failure, it will be
445    /// communicated by the status code in the result.
446    pub fn connect(&self) -> Result<(), StatusCode> {
447        loop {
448            match self.connect_no_retry() {
449                Ok(_) => {
450                    info!("Connect was successful");
451                    let mut session_retry_policy = trace_lock!(self.session_retry_policy);
452                    session_retry_policy.reset_retry_count();
453                    return Ok(());
454                }
455                Err(status_code) => {
456                    self.disconnect();
457                    let mut session_retry_policy = trace_lock!(self.session_retry_policy);
458                    session_retry_policy.increment_retry_count();
459                    session_warn!(
460                        self,
461                        "Connect was unsuccessful, error = {}, retries = {}",
462                        status_code,
463                        session_retry_policy.retry_count()
464                    );
465
466                    match session_retry_policy.should_retry_connect(DateTime::now()) {
467                        Answer::GiveUp => {
468                            session_error!(self, "Session has given up trying to connect to the server after {} retries", session_retry_policy.retry_count());
469                            return Err(StatusCode::BadNotConnected);
470                        }
471                        Answer::Retry => {
472                            info!("Retrying to connect to server...");
473                            session_retry_policy.set_last_attempt(DateTime::now());
474                        }
475                        Answer::WaitFor(sleep_for) => {
476                            // Sleep for the instructed interval before looping around and trying
477                            // once more.
478                            thread::sleep(Duration::from_millis(sleep_for as u64));
479                        }
480                    }
481                }
482            }
483        }
484    }
485
486    /// Connects to the server using the configured session arguments. No attempt is made to retry
487    /// the connection if the attempt fails. If there is a failure, it will be communicated by the
488    /// status code in the result.
489    ///
490    /// # Returns
491    ///
492    /// * `Ok(())` - connection has happened
493    /// * `Err(StatusCode)` - reason for failure
494    ///
495    pub fn connect_no_retry(&self) -> Result<(), StatusCode> {
496        let endpoint_url = self.session_info.endpoint.endpoint_url.clone();
497        info!("Connect");
498        let security_policy =
499            SecurityPolicy::from_str(self.session_info.endpoint.security_policy_uri.as_ref())
500                .unwrap();
501        if security_policy == SecurityPolicy::Unknown {
502            session_error!(
503                self,
504                "connect, security policy \"{}\" is unknown",
505                self.session_info.endpoint.security_policy_uri.as_ref()
506            );
507            Err(StatusCode::BadSecurityPolicyRejected)
508        } else {
509            let (cert, key) = {
510                let certificate_store = trace_write_lock!(self.certificate_store);
511                certificate_store.read_own_cert_and_pkey_optional()
512            };
513
514            {
515                let mut secure_channel = trace_write_lock!(self.secure_channel);
516                secure_channel.set_private_key(key);
517                secure_channel.set_cert(cert);
518                secure_channel.set_security_policy(security_policy);
519                secure_channel.set_security_mode(self.session_info.endpoint.security_mode);
520                let _ = secure_channel.set_remote_cert_from_byte_string(
521                    &self.session_info.endpoint.server_certificate,
522                );
523                info!("Security policy = {:?}", security_policy);
524                info!(
525                    "Security mode = {:?}",
526                    self.session_info.endpoint.security_mode
527                );
528            }
529
530            // Transport's tokio runtime is made here, not in transport
531            self.transport.connect(endpoint_url.as_ref())?;
532            self.open_secure_channel()?;
533            self.on_connection_status_change(true);
534            Ok(())
535        }
536    }
537
538    pub(crate) fn session_state(&self) -> Arc<RwLock<SessionState>> {
539        self.session_state.clone()
540    }
541
542    /// Disconnect from the server. Disconnect is an explicit command to drop the socket and throw
543    /// away all state information. If you disconnect you cannot reconnect to your existing session
544    /// or retrieve any existing subscriptions.
545    pub fn disconnect(&self) {
546        if self.is_connected() {
547            let _ = self.close_session_and_delete_subscriptions();
548            let _ = self.close_secure_channel();
549
550            {
551                let session_state = trace_read_lock!(self.session_state);
552                session_state.quit();
553            }
554
555            self.transport.wait_for_disconnect();
556            self.on_connection_status_change(false);
557        }
558    }
559
560    /// Test if the session is in a connected state
561    ///
562    /// # Returns
563    ///
564    /// * `true` - Session is connected
565    /// * `false` - Session is not connected
566    ///
567    pub fn is_connected(&self) -> bool {
568        self.transport.is_connected()
569    }
570
571    /// Internal constant for the sleep interval used during polling
572    const POLL_SLEEP_INTERVAL: u64 = 10;
573
574    /// Synchronously runs a polling loop over the supplied session. Running a session performs
575    /// periodic actions such as receiving messages, processing subscriptions, and recovering from
576    /// connection errors. The run function will return if the session is disconnected and
577    /// cannot be reestablished.
578    ///
579    /// # Arguments
580    ///
581    /// * `session` - the session to run ynchronously
582    ///
583    pub fn run(session: Arc<RwLock<Session>>) {
584        let (_tx, rx) = oneshot::channel();
585        Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx);
586    }
587
588    /// Runs the session asynchronously on a new thread. The function returns immediately
589    /// and gives a caller a `Sender` that can be used to send a message to the session
590    /// to cause it to terminate. Do not drop this sender (i.e. make sure to bind it to a variable with
591    /// sufficient lifetime) or the session will terminate as soon as you do.
592    ///
593    /// Running a session performs periodic actions such as receiving messages, processing subscriptions,
594    /// and recovering from.  connection errors. The session will terminate by itself if it is disconnected
595    /// and cannot be reestablished. It will terminate if the sender is dropped or if sent a ClientCommand
596    /// to terminate.  caller to this function can monitor the status of the session through state
597    /// calls to know when this happens.
598    ///
599    ///
600    /// # Arguments
601    ///
602    /// * `session` - the session to run asynchronously
603    ///
604    /// # Returns
605    ///
606    /// * `oneshot::Sender<ClientCommand>` - A sender that allows the caller to send a message to the
607    ///                        run loop to cause it to stop. Note that dropping the sender, i.e. not binding it to
608    ///                        a variable will also cause the loop to stop.
609    ///
610    pub fn run_async(session: Arc<RwLock<Session>>) -> oneshot::Sender<SessionCommand> {
611        let (tx, rx) = oneshot::channel();
612        thread::spawn(move || Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx));
613        tx
614    }
615
616    /// The asynchronous main session loop. This is the function that processes responses and
617    /// keeps the session alive. Note that while the client normally calls `run()` or `run_loop()`
618    /// to invoke this, there may be situations where the client wishes to directly use this
619    /// function, for example if the client has its own Tokio runtime and prefers to spawn the task
620    /// with that.
621    pub async fn session_task(
622        session: Arc<RwLock<Session>>,
623        sleep_interval: u64,
624        rx: oneshot::Receiver<SessionCommand>,
625    ) {
626        tokio::select! {
627            _ = async {
628                let mut timer = interval(Duration::from_millis(sleep_interval));
629                loop {
630                    // Poll the session.
631                    let poll_result = {
632                        let mut session = session.write();
633                        session.poll().await
634                    };
635                    match poll_result {
636                        Ok(did_something) => {
637                            // If the session did nothing, then sleep for a moment to save some CPU
638                            if !did_something {
639                                timer.tick().await;
640                            }
641                        }
642                        Err(_) => {
643                            // Break the loop if connection goes down
644                            info!("Run session connection to server broke, so terminating");
645                            break;
646                        }
647                    }
648                }
649            } => {}
650            message = rx => {
651                if let Ok(message) = message {
652                    // Only message is a Quit command so no point even testing what it is.
653                    info!("Run session was terminated by a message {:?}", message);
654                }
655                else {
656                    warn!("Run session was terminated, presumably by caller dropping oneshot sender. Don't do that unless you meant to.");
657                }
658            }
659        }
660    }
661
662    /// The main running loop for a session. This is used by `run()` and `run_async()` to run
663    /// continuously until a signal is received to terminate.
664    ///
665    /// # Arguments
666    ///
667    /// * `session`   - The session
668    /// * `sleep_interval` - An internal polling timer in ms
669    /// * `rx`        - A receiver that the task uses to receive a quit command directly from the caller.
670    ///
671    pub fn run_loop(
672        session: Arc<RwLock<Session>>,
673        sleep_interval: u64,
674        rx: oneshot::Receiver<SessionCommand>,
675    ) {
676        let task = {
677            let session = session.clone();
678            async move {
679                Self::session_task(session, sleep_interval, rx).await;
680            }
681        };
682        // Spawn the task on the alloted runtime
683        let runtime = {
684            let session = trace_read_lock!(session);
685            session.runtime.clone()
686        };
687        let runtime = trace_lock!(runtime);
688        runtime.block_on(task);
689    }
690
691    /// Polls on the session which basically dispatches any pending
692    /// async responses, attempts to reconnect if the client is disconnected from the client and
693    /// sleeps a little bit if nothing needed to be done.
694    ///
695    /// # Arguments
696    ///
697    /// * `sleep_for` - the period of time in milliseconds that poll should sleep for if it performed
698    ///                 no action.
699    ///
700    /// # Returns
701    ///
702    /// * `true` - if an action was performed during the poll
703    /// * `false` - if no action was performed during the poll and the poll slept
704    ///
705    pub async fn poll(&mut self) -> Result<bool, ()> {
706        let did_something = if self.is_connected() {
707            let mut session_state = trace_write_lock!(self.session_state);
708            session_state.handle_publish_responses()
709        } else {
710            let should_retry_connect = {
711                let session_retry_policy = trace_lock!(self.session_retry_policy);
712                session_retry_policy.should_retry_connect(DateTime::now())
713            };
714            match should_retry_connect {
715                Answer::GiveUp => {
716                    let session_retry_policy = trace_lock!(self.session_retry_policy);
717                    session_error!(
718                        self,
719                        "Session has given up trying to reconnect to the server after {} retries",
720                        session_retry_policy.retry_count()
721                    );
722                    return Err(());
723                }
724                Answer::Retry => {
725                    info!("Retrying to reconnect to server...");
726                    {
727                        let mut session_retry_policy = trace_lock!(self.session_retry_policy);
728                        session_retry_policy.set_last_attempt(DateTime::now());
729                    }
730                    if self.reconnect_and_activate().is_ok() {
731                        info!("Retry to connect was successful");
732                        let mut session_retry_policy = trace_lock!(self.session_retry_policy);
733                        session_retry_policy.reset_retry_count();
734                    } else {
735                        let mut session_retry_policy = trace_lock!(self.session_retry_policy);
736                        session_retry_policy.increment_retry_count();
737                        session_warn!(
738                            self,
739                            "Reconnect was unsuccessful, retries = {}",
740                            session_retry_policy.retry_count()
741                        );
742                        drop(session_retry_policy);
743                        self.disconnect();
744                    }
745                    true
746                }
747                Answer::WaitFor(_) => {
748                    // Note we could sleep for the interval in the WaitFor(), but the poll() sleeps
749                    // anyway so it probably makes no odds.
750                    false
751                }
752            }
753        };
754        Ok(did_something)
755    }
756
757    /// Start a task that will periodically "ping" the server to keep the session alive. The ping rate
758    /// will be 3/4 the session timeout rate.
759    ///
760    /// NOTE: This code assumes that the session_timeout period never changes, e.g. if you
761    /// connected to a server, negotiate a timeout period and then for whatever reason need to
762    /// reconnect to that same server, you will receive the same timeout. If you get a different
763    /// timeout then this code will not care and will continue to ping at the original rate.
764    fn spawn_session_activity_task(&self, session_timeout: f64) {
765        session_debug!(self, "spawn_session_activity_task({})", session_timeout);
766
767        let connection_state = {
768            let session_state = trace_read_lock!(self.session_state);
769            session_state.connection_state()
770        };
771
772        let session_state = self.session_state.clone();
773
774        // Session activity will happen every 3/4 of the timeout period
775        const MIN_SESSION_ACTIVITY_MS: u64 = 1000;
776        let session_activity = cmp::max((session_timeout as u64 / 4) * 3, MIN_SESSION_ACTIVITY_MS);
777        session_debug!(
778            self,
779            "session timeout is {}, activity timer is {}",
780            session_timeout,
781            session_activity
782        );
783
784        let id = format!("session-activity-thread-{:?}", thread::current().id());
785        let runtime = trace_lock!(self.runtime);
786        runtime.spawn(async move {
787            register_runtime_component!(&id);
788            // The timer runs at a higher frequency timer loop to terminate as soon after the session
789            // state has terminated. Each time it runs it will test if the interval has elapsed or not.
790            let session_activity_interval = Duration::from_millis(session_activity);
791            let mut timer = interval(Duration::from_millis(MIN_SESSION_ACTIVITY_MS));
792            let mut last_timeout = Instant::now();
793
794            loop {
795                timer.tick().await;
796
797                if connection_state.is_finished() {
798                    info!("Session activity timer is terminating");
799                    break;
800                }
801
802                // Get the time now
803                let now = Instant::now();
804
805                // Calculate to interval since last check
806                let interval = now - last_timeout;
807                if interval > session_activity_interval {
808                    match connection_state.state() {
809                        ConnectionState::Processing => {
810                            info!("Session activity keep-alive request");
811                            let mut session_state = trace_write_lock!(session_state);
812                            let request_header = session_state.make_request_header();
813                            let request = ReadRequest {
814                                request_header,
815                                max_age: 1f64,
816                                timestamps_to_return: TimestampsToReturn::Server,
817                                nodes_to_read: Some(vec![]),
818                            };
819                            // The response to this is ignored
820                            let _ = session_state.async_send_request(request, None);
821                        }
822                        connection_state => {
823                            info!("Session activity keep-alive is doing nothing - connection state = {:?}", connection_state);
824                        }
825                    };
826                    last_timeout = now;
827                }
828            }
829
830            info!("Session activity timer task is finished");
831            deregister_runtime_component!(&id);
832        });
833    }
834
835    /// Start a task that will periodically send a publish request to keep the subscriptions alive.
836    /// The request rate will be 3/4 of the shortest (revised publishing interval * the revised keep
837    /// alive count) of all subscriptions that belong to a single session.
838    fn spawn_subscription_activity_task(&self) {
839        session_debug!(self, "spawn_subscription_activity_task",);
840
841        let connection_state = {
842            let session_state = trace_read_lock!(self.session_state);
843            session_state.connection_state()
844        };
845
846        const MIN_SUBSCRIPTION_ACTIVITY_MS: u64 = 1000;
847        let session_state = self.session_state.clone();
848        let subscription_state = self.subscription_state.clone();
849
850        let id = format!("subscription-activity-thread-{:?}", thread::current().id());
851        let runtime = trace_lock!(self.runtime);
852        runtime.spawn(async move {
853            register_runtime_component!(&id);
854
855            // The timer runs at a higher frequency timer loop to terminate as soon after the session
856            // state has terminated. Each time it runs it will test if the interval has elapsed or not.
857            let mut timer = interval(Duration::from_millis(MIN_SUBSCRIPTION_ACTIVITY_MS));
858
859            let mut last_timeout: Instant;
860            let mut subscription_activity_interval: Duration;
861
862            loop {
863                timer.tick().await;
864
865                if connection_state.is_finished() {
866                    info!("Session activity timer is terminating");
867                    break;
868                }
869
870                if let (Some(keep_alive_timeout), last_publish_request) = {
871                    let subscription_state = trace_read_lock!(subscription_state);
872                    (
873                        subscription_state.keep_alive_timeout(),
874                        subscription_state.last_publish_request(),
875                    )
876                } {
877                    subscription_activity_interval =
878                        Duration::from_millis((keep_alive_timeout / 4) * 3);
879                    last_timeout = last_publish_request;
880
881                    // Get the time now
882                    let now = Instant::now();
883
884                    // Calculate to interval since last check
885                    let interval = now - last_timeout;
886                    if interval > subscription_activity_interval {
887                        let mut session_state = trace_write_lock!(session_state);
888                        let _ = session_state.async_publish();
889                    }
890                }
891            }
892
893            info!("Subscription activity timer task is finished");
894            deregister_runtime_component!(&id);
895        });
896    }
897
898    /// This is the internal handler for create subscription that receives the callback wrapped up and reference counted.
899    fn create_subscription_inner(
900        &self,
901        publishing_interval: f64,
902        lifetime_count: u32,
903        max_keep_alive_count: u32,
904        max_notifications_per_publish: u32,
905        priority: u8,
906        publishing_enabled: bool,
907        callback: Arc<Mutex<dyn OnSubscriptionNotification + Send + Sync + 'static>>,
908    ) -> Result<u32, StatusCode> {
909        let request = CreateSubscriptionRequest {
910            request_header: self.make_request_header(),
911            requested_publishing_interval: publishing_interval,
912            requested_lifetime_count: lifetime_count,
913            requested_max_keep_alive_count: max_keep_alive_count,
914            max_notifications_per_publish,
915            publishing_enabled,
916            priority,
917        };
918        let response = self.send_request(request)?;
919        if let SupportedMessage::CreateSubscriptionResponse(response) = response {
920            process_service_result(&response.response_header)?;
921            let subscription = Subscription::new(
922                response.subscription_id,
923                response.revised_publishing_interval,
924                response.revised_lifetime_count,
925                response.revised_max_keep_alive_count,
926                max_notifications_per_publish,
927                publishing_enabled,
928                priority,
929                callback,
930            );
931
932            // Add the new subscription to the subscription state
933            {
934                let mut subscription_state = trace_write_lock!(self.subscription_state);
935                subscription_state.add_subscription(subscription);
936            }
937
938            // Send an async publish request for this new subscription
939            {
940                let mut session_state = trace_write_lock!(self.session_state);
941                let _ = session_state.async_publish();
942            }
943
944            session_debug!(
945                self,
946                "create_subscription, created a subscription with id {}",
947                response.subscription_id
948            );
949            Ok(response.subscription_id)
950        } else {
951            session_error!(self, "create_subscription failed {:?}", response);
952            Err(process_unexpected_response(response))
953        }
954    }
955
956    /// Deletes all subscriptions by sending a [`DeleteSubscriptionsRequest`] to the server with
957    /// ids for all subscriptions.
958    ///
959    /// # Returns
960    ///
961    /// * `Ok(Vec<(u32, StatusCode)>)` - List of (id, status code) result for delete action on each id, `Good` or `BadSubscriptionIdInvalid`
962    /// * `Err(StatusCode)` - Status code reason for failure
963    ///
964    /// [`DeleteSubscriptionsRequest`]: ./struct.DeleteSubscriptionsRequest.html
965    ///
966    pub fn delete_all_subscriptions(&self) -> Result<Vec<(u32, StatusCode)>, StatusCode> {
967        let subscription_ids = {
968            let subscription_state = trace_read_lock!(self.subscription_state);
969            subscription_state.subscription_ids()
970        };
971        if let Some(ref subscription_ids) = subscription_ids {
972            let status_codes = self.delete_subscriptions(subscription_ids.as_slice())?;
973            // Return a list of (id, status_code) for each subscription
974            Ok(subscription_ids
975                .iter()
976                .zip(status_codes)
977                .map(|(id, status_code)| (*id, status_code))
978                .collect())
979        } else {
980            // No subscriptions
981            session_trace!(
982                self,
983                "delete_all_subscriptions, called when there are no subscriptions"
984            );
985            Err(StatusCode::BadNothingToDo)
986        }
987    }
988
989    /// Closes the session and deletes all subscriptions
990    ///
991    /// # Returns
992    ///
993    /// * `Ok(())` - if the session was closed
994    /// * `Err(StatusCode)` - Status code reason for failure
995    ///
996    /// [`CloseSessionRequest`]: ./struct.CloseSessionRequest.html
997    ///
998    pub fn close_session_and_delete_subscriptions(&self) -> Result<(), StatusCode> {
999        if !self.is_connected() {
1000            return Err(StatusCode::BadNotConnected);
1001        }
1002        // for some operations like enumerating endpoints, there is no session equivalent
1003        // on the server and it's a local helper object, only. In that case: nothing to do.
1004        if trace_read_lock!(self.session_state).session_id().identifier == Identifier::Numeric(0) {
1005            return Ok(());
1006        }
1007        let request = CloseSessionRequest {
1008            delete_subscriptions: true,
1009            request_header: self.make_request_header(),
1010        };
1011        let response = self.send_request(request)?;
1012        if let SupportedMessage::CloseSessionResponse(_) = response {
1013            let mut subscription_state = trace_write_lock!(self.subscription_state);
1014            if let Some(subscription_ids) = subscription_state.subscription_ids() {
1015                for subscription_id in subscription_ids {
1016                    subscription_state.delete_subscription(subscription_id);
1017                }
1018            }
1019            Ok(())
1020        } else {
1021            session_error!(self, "close_session failed {:?}", response);
1022            Err(process_unexpected_response(response))
1023        }
1024    }
1025
1026    /// Returns the subscription state object
1027    pub fn subscription_state(&self) -> Arc<RwLock<SubscriptionState>> {
1028        self.subscription_state.clone()
1029    }
1030
1031    /// Returns a string identifier for the session
1032    pub(crate) fn session_id(&self) -> String {
1033        let session_state = self.session_state();
1034        let session_state = session_state.read();
1035        format!("session:{}", session_state.id())
1036    }
1037
1038    /// Notify any callback of the connection status change
1039    fn on_connection_status_change(&self, connected: bool) {
1040        let mut session_state = trace_write_lock!(self.session_state);
1041        session_state.on_connection_status_change(connected);
1042    }
1043
1044    /// Returns the security policy
1045    fn security_policy(&self) -> SecurityPolicy {
1046        let secure_channel = trace_read_lock!(self.secure_channel);
1047        secure_channel.security_policy()
1048    }
1049
1050    // Test if the subscription by id exists
1051    fn subscription_exists(&self, subscription_id: u32) -> bool {
1052        let subscription_state = trace_read_lock!(self.subscription_state);
1053        subscription_state.subscription_exists(subscription_id)
1054    }
1055
1056    // Creates a user identity token according to the endpoint, policy that the client is currently connected to the
1057    // server with.
1058    fn user_identity_token(
1059        &self,
1060        server_cert: &Option<X509>,
1061        server_nonce: &[u8],
1062    ) -> Result<(ExtensionObject, SignatureData), StatusCode> {
1063        let user_identity_token = &self.session_info.user_identity_token;
1064        let user_token_type = match user_identity_token {
1065            IdentityToken::Anonymous => UserTokenType::Anonymous,
1066            IdentityToken::UserName(_, _) => UserTokenType::UserName,
1067            IdentityToken::X509(_, _) => UserTokenType::Certificate,
1068        };
1069
1070        let endpoint = &self.session_info.endpoint;
1071        let policy = endpoint.find_policy(user_token_type);
1072        session_debug!(self, "Endpoint policy = {:?}", policy);
1073
1074        // Return the result
1075        match policy {
1076            None => {
1077                session_error!(
1078                    self,
1079                    "Cannot find user token type {:?} for this endpoint, cannot connect",
1080                    user_token_type
1081                );
1082                Err(StatusCode::BadSecurityPolicyRejected)
1083            }
1084            Some(policy) => {
1085                let security_policy = if policy.security_policy_uri.is_null() {
1086                    // Assume None
1087                    SecurityPolicy::None
1088                } else {
1089                    SecurityPolicy::from_uri(policy.security_policy_uri.as_ref())
1090                };
1091                if security_policy == SecurityPolicy::Unknown {
1092                    session_error!(
1093                        self,
1094                        "Can't support the security policy {}",
1095                        policy.security_policy_uri
1096                    );
1097                    Err(StatusCode::BadSecurityPolicyRejected)
1098                } else {
1099                    match user_identity_token {
1100                        IdentityToken::Anonymous => {
1101                            let identity_token = AnonymousIdentityToken {
1102                                policy_id: policy.policy_id.clone(),
1103                            };
1104                            let identity_token = ExtensionObject::from_encodable(
1105                                ObjectId::AnonymousIdentityToken_Encoding_DefaultBinary,
1106                                &identity_token,
1107                            );
1108                            Ok((identity_token, SignatureData::null()))
1109                        }
1110                        IdentityToken::UserName(ref user, ref pass) => {
1111                            let secure_channel = trace_read_lock!(self.secure_channel);
1112                            let identity_token = self.make_user_name_identity_token(
1113                                &secure_channel,
1114                                policy,
1115                                user,
1116                                pass,
1117                            )?;
1118                            let identity_token = ExtensionObject::from_encodable(
1119                                ObjectId::UserNameIdentityToken_Encoding_DefaultBinary,
1120                                &identity_token,
1121                            );
1122                            Ok((identity_token, SignatureData::null()))
1123                        }
1124                        IdentityToken::X509(ref cert_path, ref private_key_path) => {
1125                            if let Some(ref server_cert) = server_cert {
1126                                // The cert will be supplied to the server along with a signature to prove we have the private key to go with the cert
1127                                let certificate_data = CertificateStore::read_cert(cert_path)
1128                                    .map_err(|e| {
1129                                        session_error!(
1130                                            self,
1131                                            "Certificate cannot be loaded from path {}, error = {}",
1132                                            cert_path.to_str().unwrap(),
1133                                            e
1134                                        );
1135                                        StatusCode::BadSecurityPolicyRejected
1136                                    })?;
1137                                let private_key = CertificateStore::read_pkey(private_key_path)
1138                                    .map_err(|e| {
1139                                        session_error!(
1140                                            self,
1141                                            "Private key cannot be loaded from path {}, error = {}",
1142                                            private_key_path.to_str().unwrap(),
1143                                            e
1144                                        );
1145                                        StatusCode::BadSecurityPolicyRejected
1146                                    })?;
1147
1148                                // Create a signature using the X509 private key to sign the server's cert and nonce
1149                                let user_token_signature = crypto::create_signature_data(
1150                                    &private_key,
1151                                    security_policy,
1152                                    &server_cert.as_byte_string(),
1153                                    &ByteString::from(server_nonce),
1154                                )?;
1155
1156                                // Create identity token
1157                                let identity_token = X509IdentityToken {
1158                                    policy_id: policy.policy_id.clone(),
1159                                    certificate_data: certificate_data.as_byte_string(),
1160                                };
1161                                let identity_token = ExtensionObject::from_encodable(
1162                                    ObjectId::X509IdentityToken_Encoding_DefaultBinary,
1163                                    &identity_token,
1164                                );
1165
1166                                Ok((identity_token, user_token_signature))
1167                            } else {
1168                                session_error!(self, "Cannot create an X509IdentityToken because the remote server has no cert with which to create a signature");
1169                                Err(StatusCode::BadCertificateInvalid)
1170                            }
1171                        }
1172                    }
1173                }
1174            }
1175        }
1176    }
1177
1178    /// Create a filled in UserNameIdentityToken by using the endpoint's token policy, the current
1179    /// secure channel information and the user name and password.
1180    fn make_user_name_identity_token(
1181        &self,
1182        secure_channel: &SecureChannel,
1183        user_token_policy: &UserTokenPolicy,
1184        user: &str,
1185        pass: &str,
1186    ) -> Result<UserNameIdentityToken, StatusCode> {
1187        let channel_security_policy = secure_channel.security_policy();
1188        let nonce = secure_channel.remote_nonce();
1189        let cert = secure_channel.remote_cert();
1190        make_user_name_identity_token(
1191            channel_security_policy,
1192            user_token_policy,
1193            nonce,
1194            &cert,
1195            user,
1196            pass,
1197        )
1198    }
1199}
1200
1201impl Service for Session {
1202    /// Construct a request header for the session. All requests after create session are expected
1203    /// to supply an authentication token.
1204    fn make_request_header(&self) -> RequestHeader {
1205        let mut session_state = trace_write_lock!(self.session_state);
1206        session_state.make_request_header()
1207    }
1208
1209    /// Synchronously sends a request. The return value is the response to the request
1210    fn send_request<T>(&self, request: T) -> Result<SupportedMessage, StatusCode>
1211    where
1212        T: Into<SupportedMessage>,
1213    {
1214        let mut session_state = trace_write_lock!(self.session_state);
1215        session_state.send_request(request)
1216    }
1217
1218    // Asynchronously sends a request. The return value is the request handle of the request
1219    fn async_send_request<T>(
1220        &self,
1221        request: T,
1222        sender: Option<SyncSender<SupportedMessage>>,
1223    ) -> Result<u32, StatusCode>
1224    where
1225        T: Into<SupportedMessage>,
1226    {
1227        let mut session_state = trace_write_lock!(self.session_state);
1228        session_state.async_send_request(request, sender)
1229    }
1230}
1231
1232impl DiscoveryService for Session {
1233    fn find_servers<T>(&self, endpoint_url: T) -> Result<Vec<ApplicationDescription>, StatusCode>
1234    where
1235        T: Into<UAString>,
1236    {
1237        let request = FindServersRequest {
1238            request_header: self.make_request_header(),
1239            endpoint_url: endpoint_url.into(),
1240            locale_ids: None,
1241            server_uris: None,
1242        };
1243        let response = self.send_request(request)?;
1244        if let SupportedMessage::FindServersResponse(response) = response {
1245            process_service_result(&response.response_header)?;
1246            let servers = if let Some(servers) = response.servers {
1247                servers
1248            } else {
1249                Vec::new()
1250            };
1251            Ok(servers)
1252        } else {
1253            Err(process_unexpected_response(response))
1254        }
1255    }
1256
1257    fn get_endpoints(&self) -> Result<Vec<EndpointDescription>, StatusCode> {
1258        session_debug!(self, "get_endpoints");
1259        let endpoint_url = self.session_info.endpoint.endpoint_url.clone();
1260
1261        let request = GetEndpointsRequest {
1262            request_header: self.make_request_header(),
1263            endpoint_url,
1264            locale_ids: None,
1265            profile_uris: None,
1266        };
1267
1268        let response = self.send_request(request)?;
1269        if let SupportedMessage::GetEndpointsResponse(response) = response {
1270            process_service_result(&response.response_header)?;
1271            match response.endpoints {
1272                None => {
1273                    session_debug!(self, "get_endpoints, success but no endpoints");
1274                    Ok(Vec::new())
1275                }
1276                Some(endpoints) => {
1277                    session_debug!(self, "get_endpoints, success");
1278                    Ok(endpoints)
1279                }
1280            }
1281        } else {
1282            session_error!(self, "get_endpoints failed {:?}", response);
1283            Err(process_unexpected_response(response))
1284        }
1285    }
1286
1287    fn register_server(&self, server: RegisteredServer) -> Result<(), StatusCode> {
1288        let request = RegisterServerRequest {
1289            request_header: self.make_request_header(),
1290            server,
1291        };
1292        let response = self.send_request(request)?;
1293        if let SupportedMessage::RegisterServerResponse(response) = response {
1294            process_service_result(&response.response_header)?;
1295            Ok(())
1296        } else {
1297            Err(process_unexpected_response(response))
1298        }
1299    }
1300}
1301
1302impl SecureChannelService for Session {
1303    fn open_secure_channel(&self) -> Result<(), StatusCode> {
1304        session_debug!(self, "open_secure_channel");
1305        let mut session_state = trace_write_lock!(self.session_state);
1306        session_state.issue_or_renew_secure_channel(SecurityTokenRequestType::Issue)
1307    }
1308
1309    fn close_secure_channel(&self) -> Result<(), StatusCode> {
1310        let request = CloseSecureChannelRequest {
1311            request_header: self.make_request_header(),
1312        };
1313        // We do not wait for a response because there may not be one. Just return
1314        let _ = self.async_send_request(request, None);
1315        Ok(())
1316    }
1317}
1318
1319impl SessionService for Session {
1320    fn create_session(&self) -> Result<NodeId, StatusCode> {
1321        // Get some state stuff
1322        let endpoint_url = self.session_info.endpoint.endpoint_url.clone();
1323
1324        let client_nonce = {
1325            let secure_channel = trace_read_lock!(self.secure_channel);
1326            secure_channel.local_nonce_as_byte_string()
1327        };
1328
1329        let server_uri = UAString::null();
1330        let session_name = self.session_name.clone();
1331
1332        let (client_certificate, _) = {
1333            let certificate_store = trace_write_lock!(self.certificate_store);
1334            certificate_store.read_own_cert_and_pkey_optional()
1335        };
1336
1337        // Security
1338        let client_certificate = if let Some(ref client_certificate) = client_certificate {
1339            client_certificate.as_byte_string()
1340        } else {
1341            ByteString::null()
1342        };
1343
1344        // Requested session timeout should be larger than your expected subscription rate.
1345        let requested_session_timeout = {
1346            let session_retry_policy = trace_lock!(self.session_retry_policy);
1347            session_retry_policy.session_timeout()
1348        };
1349
1350        let request = CreateSessionRequest {
1351            request_header: self.make_request_header(),
1352            client_description: self.application_description.clone(),
1353            server_uri,
1354            endpoint_url,
1355            session_name,
1356            client_nonce,
1357            client_certificate,
1358            requested_session_timeout,
1359            max_response_message_size: 0,
1360        };
1361
1362        session_debug!(self, "CreateSessionRequest = {:?}", request);
1363
1364        let response = self.send_request(request)?;
1365        if let SupportedMessage::CreateSessionResponse(response) = response {
1366            process_service_result(&response.response_header)?;
1367
1368            let session_id = {
1369                let mut session_state = trace_write_lock!(self.session_state);
1370                session_state.set_session_id(response.session_id.clone());
1371                session_state.set_authentication_token(response.authentication_token.clone());
1372                {
1373                    let mut secure_channel = trace_write_lock!(self.secure_channel);
1374                    let _ =
1375                        secure_channel.set_remote_nonce_from_byte_string(&response.server_nonce);
1376                    let _ = secure_channel
1377                        .set_remote_cert_from_byte_string(&response.server_certificate);
1378                }
1379                // When ignoring clock skew, we calculate the time offset between the client
1380                // and the server and use that to compensate for the difference in time.
1381                if self.ignore_clock_skew && !response.response_header.timestamp.is_null() {
1382                    let offset = response.response_header.timestamp - DateTime::now();
1383                    // Update the client offset by adding the new offset.
1384                    session_state.set_client_offset(offset);
1385                }
1386                session_state.session_id()
1387            };
1388
1389            // session_debug!(self, "Server nonce is {:?}", response.server_nonce);
1390
1391            // The server certificate is validated if the policy requires it
1392            let security_policy = self.security_policy();
1393            let cert_status_code = if security_policy != SecurityPolicy::None {
1394                if let Ok(server_certificate) =
1395                    crypto::X509::from_byte_string(&response.server_certificate)
1396                {
1397                    // Validate server certificate against hostname and application_uri
1398                    let hostname =
1399                        hostname_from_url(self.session_info.endpoint.endpoint_url.as_ref())
1400                            .map_err(|_| StatusCode::BadUnexpectedError)?;
1401                    let application_uri =
1402                        self.session_info.endpoint.server.application_uri.as_ref();
1403
1404                    let certificate_store = trace_write_lock!(self.certificate_store);
1405                    let result = certificate_store.validate_or_reject_application_instance_cert(
1406                        &server_certificate,
1407                        security_policy,
1408                        Some(&hostname),
1409                        Some(application_uri),
1410                    );
1411                    if result.is_bad() {
1412                        result
1413                    } else {
1414                        StatusCode::Good
1415                    }
1416                } else {
1417                    session_error!(self, "Server did not supply a valid X509 certificate");
1418                    StatusCode::BadCertificateInvalid
1419                }
1420            } else {
1421                StatusCode::Good
1422            };
1423
1424            if !cert_status_code.is_good() && !trust_any_server_cert() {
1425                session_error!(self, "Server's certificate was rejected");
1426                Err(cert_status_code)
1427            } else {
1428                // Spawn a task to ping the server to keep the connection alive before the session
1429                // timeout period.
1430                session_debug!(
1431                    self,
1432                    "Revised session timeout is {}",
1433                    response.revised_session_timeout
1434                );
1435                self.spawn_session_activity_task(response.revised_session_timeout);
1436                self.spawn_subscription_activity_task();
1437
1438                // TODO Verify signature using server's public key (from endpoint) comparing with data made from client certificate and nonce.
1439                // crypto::verify_signature_data(verification_key, security_policy, server_certificate, client_certificate, client_nonce);
1440                Ok(session_id)
1441            }
1442        } else {
1443            Err(process_unexpected_response(response))
1444        }
1445    }
1446
1447    fn activate_session(&self) -> Result<(), StatusCode> {
1448        let (user_identity_token, user_token_signature) = {
1449            let secure_channel = trace_read_lock!(self.secure_channel);
1450            self.user_identity_token(&secure_channel.remote_cert(), secure_channel.remote_nonce())?
1451        };
1452
1453        let locale_ids = if self.session_info.preferred_locales.is_empty() {
1454            None
1455        } else {
1456            let locale_ids = self
1457                .session_info
1458                .preferred_locales
1459                .iter()
1460                .map(UAString::from)
1461                .collect();
1462            Some(locale_ids)
1463        };
1464
1465        let security_policy = self.security_policy();
1466        let client_signature = match security_policy {
1467            SecurityPolicy::None => SignatureData::null(),
1468            _ => {
1469                let secure_channel = trace_read_lock!(self.secure_channel);
1470                let server_cert = secure_channel.remote_cert();
1471                let server_nonce = secure_channel.remote_nonce();
1472
1473                let (_, client_pkey) = {
1474                    let certificate_store = trace_write_lock!(self.certificate_store);
1475                    certificate_store.read_own_cert_and_pkey_optional()
1476                };
1477
1478                // Create a signature data
1479                if client_pkey.is_none() {
1480                    session_error!(self, "Cannot create client signature - no pkey!");
1481                    return Err(StatusCode::BadUnexpectedError);
1482                } else if server_cert.is_none() {
1483                    session_error!(
1484                        self,
1485                        "Cannot sign server certificate because server cert is null"
1486                    );
1487                    return Err(StatusCode::BadUnexpectedError);
1488                } else if server_nonce.is_empty() {
1489                    session_error!(
1490                        self,
1491                        "Cannot sign server certificate because server nonce is empty"
1492                    );
1493                    return Err(StatusCode::BadUnexpectedError);
1494                }
1495
1496                let server_cert = secure_channel
1497                    .remote_cert()
1498                    .as_ref()
1499                    .unwrap()
1500                    .as_byte_string();
1501                let server_nonce = ByteString::from(secure_channel.remote_nonce());
1502                let signing_key = client_pkey.as_ref().unwrap();
1503                crypto::create_signature_data(
1504                    signing_key,
1505                    security_policy,
1506                    &server_cert,
1507                    &server_nonce,
1508                )?
1509            }
1510        };
1511
1512        let client_software_certificates = None;
1513
1514        let request = ActivateSessionRequest {
1515            request_header: self.make_request_header(),
1516            client_signature,
1517            client_software_certificates,
1518            locale_ids,
1519            user_identity_token,
1520            user_token_signature,
1521        };
1522
1523        // trace!("ActivateSessionRequest = {:#?}", request);
1524
1525        let response = self.send_request(request)?;
1526        if let SupportedMessage::ActivateSessionResponse(response) = response {
1527            // trace!("ActivateSessionResponse = {:#?}", response);
1528            process_service_result(&response.response_header)?;
1529            Ok(())
1530        } else {
1531            Err(process_unexpected_response(response))
1532        }
1533    }
1534
1535    fn cancel(&self, request_handle: IntegerId) -> Result<u32, StatusCode> {
1536        let request = CancelRequest {
1537            request_header: self.make_request_header(),
1538            request_handle,
1539        };
1540        let response = self.send_request(request)?;
1541        if let SupportedMessage::CancelResponse(response) = response {
1542            process_service_result(&response.response_header)?;
1543            Ok(response.cancel_count)
1544        } else {
1545            Err(process_unexpected_response(response))
1546        }
1547    }
1548}
1549
1550impl SubscriptionService for Session {
1551    fn create_subscription<CB>(
1552        &self,
1553        publishing_interval: f64,
1554        lifetime_count: u32,
1555        max_keep_alive_count: u32,
1556        max_notifications_per_publish: u32,
1557        priority: u8,
1558        publishing_enabled: bool,
1559        callback: CB,
1560    ) -> Result<u32, StatusCode>
1561    where
1562        CB: OnSubscriptionNotification + Send + Sync + 'static,
1563    {
1564        self.create_subscription_inner(
1565            publishing_interval,
1566            lifetime_count,
1567            max_keep_alive_count,
1568            max_notifications_per_publish,
1569            priority,
1570            publishing_enabled,
1571            Arc::new(Mutex::new(callback)),
1572        )
1573    }
1574
1575    fn modify_subscription(
1576        &self,
1577        subscription_id: u32,
1578        publishing_interval: f64,
1579        lifetime_count: u32,
1580        max_keep_alive_count: u32,
1581        max_notifications_per_publish: u32,
1582        priority: u8,
1583    ) -> Result<(), StatusCode> {
1584        if subscription_id == 0 {
1585            session_error!(self, "modify_subscription, subscription id must be non-zero, or the subscription is considered invalid");
1586            Err(StatusCode::BadInvalidArgument)
1587        } else if !self.subscription_exists(subscription_id) {
1588            session_error!(self, "modify_subscription, subscription id does not exist");
1589            Err(StatusCode::BadInvalidArgument)
1590        } else {
1591            let request = ModifySubscriptionRequest {
1592                request_header: self.make_request_header(),
1593                subscription_id,
1594                requested_publishing_interval: publishing_interval,
1595                requested_lifetime_count: lifetime_count,
1596                requested_max_keep_alive_count: max_keep_alive_count,
1597                max_notifications_per_publish,
1598                priority,
1599            };
1600            let response = self.send_request(request)?;
1601            if let SupportedMessage::ModifySubscriptionResponse(response) = response {
1602                process_service_result(&response.response_header)?;
1603                let mut subscription_state = trace_write_lock!(self.subscription_state);
1604                subscription_state.modify_subscription(
1605                    subscription_id,
1606                    response.revised_publishing_interval,
1607                    response.revised_lifetime_count,
1608                    response.revised_max_keep_alive_count,
1609                    max_notifications_per_publish,
1610                    priority,
1611                );
1612                session_debug!(self, "modify_subscription success for {}", subscription_id);
1613                Ok(())
1614            } else {
1615                session_error!(self, "modify_subscription failed {:?}", response);
1616                Err(process_unexpected_response(response))
1617            }
1618        }
1619    }
1620
1621    fn set_publishing_mode(
1622        &self,
1623        subscription_ids: &[u32],
1624        publishing_enabled: bool,
1625    ) -> Result<Vec<StatusCode>, StatusCode> {
1626        session_debug!(
1627            self,
1628            "set_publishing_mode, for subscriptions {:?}, publishing enabled {}",
1629            subscription_ids,
1630            publishing_enabled
1631        );
1632        if subscription_ids.is_empty() {
1633            // No subscriptions
1634            session_error!(
1635                self,
1636                "set_publishing_mode, no subscription ids were provided"
1637            );
1638            Err(StatusCode::BadNothingToDo)
1639        } else {
1640            let request = SetPublishingModeRequest {
1641                request_header: self.make_request_header(),
1642                publishing_enabled,
1643                subscription_ids: Some(subscription_ids.to_vec()),
1644            };
1645            let response = self.send_request(request)?;
1646            if let SupportedMessage::SetPublishingModeResponse(response) = response {
1647                process_service_result(&response.response_header)?;
1648                {
1649                    // Clear out all subscriptions, assuming the delete worked
1650                    let mut subscription_state = trace_write_lock!(self.subscription_state);
1651                    subscription_state.set_publishing_mode(subscription_ids, publishing_enabled);
1652                }
1653                session_debug!(self, "set_publishing_mode success");
1654                Ok(response.results.unwrap())
1655            } else {
1656                session_error!(self, "set_publishing_mode failed {:?}", response);
1657                Err(process_unexpected_response(response))
1658            }
1659        }
1660    }
1661
1662    fn transfer_subscriptions(
1663        &self,
1664        subscription_ids: &[u32],
1665        send_initial_values: bool,
1666    ) -> Result<Vec<TransferResult>, StatusCode> {
1667        if subscription_ids.is_empty() {
1668            // No subscriptions
1669            session_error!(
1670                self,
1671                "set_publishing_mode, no subscription ids were provided"
1672            );
1673            Err(StatusCode::BadNothingToDo)
1674        } else {
1675            let request = TransferSubscriptionsRequest {
1676                request_header: self.make_request_header(),
1677                subscription_ids: Some(subscription_ids.to_vec()),
1678                send_initial_values,
1679            };
1680            let response = self.send_request(request)?;
1681            if let SupportedMessage::TransferSubscriptionsResponse(response) = response {
1682                process_service_result(&response.response_header)?;
1683                session_debug!(self, "transfer_subscriptions success");
1684                Ok(response.results.unwrap())
1685            } else {
1686                session_error!(self, "transfer_subscriptions failed {:?}", response);
1687                Err(process_unexpected_response(response))
1688            }
1689        }
1690    }
1691
1692    fn delete_subscription(&self, subscription_id: u32) -> Result<StatusCode, StatusCode> {
1693        if subscription_id == 0 {
1694            session_error!(self, "delete_subscription, subscription id 0 is invalid");
1695            Err(StatusCode::BadInvalidArgument)
1696        } else if !self.subscription_exists(subscription_id) {
1697            session_error!(
1698                self,
1699                "delete_subscription, subscription id {} does not exist",
1700                subscription_id
1701            );
1702            Err(StatusCode::BadInvalidArgument)
1703        } else {
1704            let result = self.delete_subscriptions(&[subscription_id][..])?;
1705            Ok(result[0])
1706        }
1707    }
1708
1709    fn delete_subscriptions(
1710        &self,
1711        subscription_ids: &[u32],
1712    ) -> Result<Vec<StatusCode>, StatusCode> {
1713        if subscription_ids.is_empty() {
1714            // No subscriptions
1715            session_trace!(self, "delete_subscriptions with no subscriptions");
1716            Err(StatusCode::BadNothingToDo)
1717        } else {
1718            // Send a delete request holding all the subscription ides that we wish to delete
1719            let request = DeleteSubscriptionsRequest {
1720                request_header: self.make_request_header(),
1721                subscription_ids: Some(subscription_ids.to_vec()),
1722            };
1723            let response = self.send_request(request)?;
1724            if let SupportedMessage::DeleteSubscriptionsResponse(response) = response {
1725                process_service_result(&response.response_header)?;
1726                {
1727                    // Clear out deleted subscriptions, assuming the delete worked
1728                    let mut subscription_state = trace_write_lock!(self.subscription_state);
1729                    subscription_ids.iter().for_each(|id| {
1730                        let _ = subscription_state.delete_subscription(*id);
1731                    });
1732                }
1733                session_debug!(self, "delete_subscriptions success");
1734                Ok(response.results.unwrap())
1735            } else {
1736                session_error!(self, "delete_subscriptions failed {:?}", response);
1737                Err(process_unexpected_response(response))
1738            }
1739        }
1740    }
1741}
1742
1743impl NodeManagementService for Session {
1744    fn add_nodes(&self, nodes_to_add: &[AddNodesItem]) -> Result<Vec<AddNodesResult>, StatusCode> {
1745        if nodes_to_add.is_empty() {
1746            session_error!(self, "add_nodes, called with no nodes to add");
1747            Err(StatusCode::BadNothingToDo)
1748        } else {
1749            let request = AddNodesRequest {
1750                request_header: self.make_request_header(),
1751                nodes_to_add: Some(nodes_to_add.to_vec()),
1752            };
1753            let response = self.send_request(request)?;
1754            if let SupportedMessage::AddNodesResponse(response) = response {
1755                Ok(response.results.unwrap())
1756            } else {
1757                Err(process_unexpected_response(response))
1758            }
1759        }
1760    }
1761
1762    fn add_references(
1763        &self,
1764        references_to_add: &[AddReferencesItem],
1765    ) -> Result<Vec<StatusCode>, StatusCode> {
1766        if references_to_add.is_empty() {
1767            session_error!(self, "add_references, called with no references to add");
1768            Err(StatusCode::BadNothingToDo)
1769        } else {
1770            let request = AddReferencesRequest {
1771                request_header: self.make_request_header(),
1772                references_to_add: Some(references_to_add.to_vec()),
1773            };
1774            let response = self.send_request(request)?;
1775            if let SupportedMessage::AddReferencesResponse(response) = response {
1776                Ok(response.results.unwrap())
1777            } else {
1778                Err(process_unexpected_response(response))
1779            }
1780        }
1781    }
1782
1783    fn delete_nodes(
1784        &self,
1785        nodes_to_delete: &[DeleteNodesItem],
1786    ) -> Result<Vec<StatusCode>, StatusCode> {
1787        if nodes_to_delete.is_empty() {
1788            session_error!(self, "delete_nodes, called with no nodes to delete");
1789            Err(StatusCode::BadNothingToDo)
1790        } else {
1791            let request = DeleteNodesRequest {
1792                request_header: self.make_request_header(),
1793                nodes_to_delete: Some(nodes_to_delete.to_vec()),
1794            };
1795            let response = self.send_request(request)?;
1796            if let SupportedMessage::DeleteNodesResponse(response) = response {
1797                Ok(response.results.unwrap())
1798            } else {
1799                Err(process_unexpected_response(response))
1800            }
1801        }
1802    }
1803
1804    fn delete_references(
1805        &self,
1806        references_to_delete: &[DeleteReferencesItem],
1807    ) -> Result<Vec<StatusCode>, StatusCode> {
1808        if references_to_delete.is_empty() {
1809            session_error!(
1810                self,
1811                "delete_references, called with no references to delete"
1812            );
1813            Err(StatusCode::BadNothingToDo)
1814        } else {
1815            let request = DeleteReferencesRequest {
1816                request_header: self.make_request_header(),
1817                references_to_delete: Some(references_to_delete.to_vec()),
1818            };
1819            let response = self.send_request(request)?;
1820            if let SupportedMessage::DeleteReferencesResponse(response) = response {
1821                Ok(response.results.unwrap())
1822            } else {
1823                Err(process_unexpected_response(response))
1824            }
1825        }
1826    }
1827}
1828
1829impl MonitoredItemService for Session {
1830    fn create_monitored_items(
1831        &self,
1832        subscription_id: u32,
1833        timestamps_to_return: TimestampsToReturn,
1834        items_to_create: &[MonitoredItemCreateRequest],
1835    ) -> Result<Vec<MonitoredItemCreateResult>, StatusCode> {
1836        session_debug!(
1837            self,
1838            "create_monitored_items, for subscription {}, {} items",
1839            subscription_id,
1840            items_to_create.len()
1841        );
1842        if subscription_id == 0 {
1843            session_error!(self, "create_monitored_items, subscription id 0 is invalid");
1844            Err(StatusCode::BadInvalidArgument)
1845        } else if !self.subscription_exists(subscription_id) {
1846            session_error!(
1847                self,
1848                "create_monitored_items, subscription id {} does not exist",
1849                subscription_id
1850            );
1851            Err(StatusCode::BadInvalidArgument)
1852        } else if items_to_create.is_empty() {
1853            session_error!(
1854                self,
1855                "create_monitored_items, called with no items to create"
1856            );
1857            Err(StatusCode::BadNothingToDo)
1858        } else {
1859            // Assign each item a unique client handle
1860            let mut items_to_create = items_to_create.to_vec();
1861            {
1862                let mut session_state = trace_write_lock!(self.session_state);
1863                items_to_create.iter_mut().for_each(|i| {
1864                    //if user doesn't specify a valid client_handle
1865                    if i.requested_parameters.client_handle == 0 {
1866                        i.requested_parameters.client_handle =
1867                            session_state.next_monitored_item_handle();
1868                    }
1869                });
1870            }
1871
1872            let request = CreateMonitoredItemsRequest {
1873                request_header: self.make_request_header(),
1874                subscription_id,
1875                timestamps_to_return,
1876                items_to_create: Some(items_to_create.clone()),
1877            };
1878            let response = self.send_request(request)?;
1879            if let SupportedMessage::CreateMonitoredItemsResponse(response) = response {
1880                process_service_result(&response.response_header)?;
1881                if let Some(ref results) = response.results {
1882                    session_debug!(
1883                        self,
1884                        "create_monitored_items, {} items created",
1885                        items_to_create.len()
1886                    );
1887                    // Set the items in our internal state
1888                    let items_to_create = items_to_create
1889                        .iter()
1890                        .zip(results)
1891                        .map(|(i, r)| subscription::CreateMonitoredItem {
1892                            id: r.monitored_item_id,
1893                            client_handle: i.requested_parameters.client_handle,
1894                            discard_oldest: i.requested_parameters.discard_oldest,
1895                            item_to_monitor: i.item_to_monitor.clone(),
1896                            monitoring_mode: i.monitoring_mode,
1897                            queue_size: r.revised_queue_size,
1898                            sampling_interval: r.revised_sampling_interval,
1899                        })
1900                        .collect::<Vec<subscription::CreateMonitoredItem>>();
1901                    {
1902                        let mut subscription_state = trace_write_lock!(self.subscription_state);
1903                        subscription_state
1904                            .insert_monitored_items(subscription_id, &items_to_create);
1905                    }
1906                } else {
1907                    session_debug!(
1908                        self,
1909                        "create_monitored_items, success but no monitored items were created"
1910                    );
1911                }
1912                Ok(response.results.unwrap())
1913            } else {
1914                session_error!(self, "create_monitored_items failed {:?}", response);
1915                Err(process_unexpected_response(response))
1916            }
1917        }
1918    }
1919
1920    fn modify_monitored_items(
1921        &self,
1922        subscription_id: u32,
1923        timestamps_to_return: TimestampsToReturn,
1924        items_to_modify: &[MonitoredItemModifyRequest],
1925    ) -> Result<Vec<MonitoredItemModifyResult>, StatusCode> {
1926        session_debug!(
1927            self,
1928            "modify_monitored_items, for subscription {}, {} items",
1929            subscription_id,
1930            items_to_modify.len()
1931        );
1932        if subscription_id == 0 {
1933            session_error!(self, "modify_monitored_items, subscription id 0 is invalid");
1934            Err(StatusCode::BadInvalidArgument)
1935        } else if !self.subscription_exists(subscription_id) {
1936            session_error!(
1937                self,
1938                "modify_monitored_items, subscription id {} does not exist",
1939                subscription_id
1940            );
1941            Err(StatusCode::BadInvalidArgument)
1942        } else if items_to_modify.is_empty() {
1943            session_error!(
1944                self,
1945                "modify_monitored_items, called with no items to modify"
1946            );
1947            Err(StatusCode::BadNothingToDo)
1948        } else {
1949            let monitored_item_ids = items_to_modify
1950                .iter()
1951                .map(|i| i.monitored_item_id)
1952                .collect::<Vec<u32>>();
1953            let request = ModifyMonitoredItemsRequest {
1954                request_header: self.make_request_header(),
1955                subscription_id,
1956                timestamps_to_return,
1957                items_to_modify: Some(items_to_modify.to_vec()),
1958            };
1959            let response = self.send_request(request)?;
1960            if let SupportedMessage::ModifyMonitoredItemsResponse(response) = response {
1961                process_service_result(&response.response_header)?;
1962                if let Some(ref results) = response.results {
1963                    // Set the items in our internal state
1964                    let items_to_modify = monitored_item_ids
1965                        .iter()
1966                        .zip(results.iter())
1967                        .map(|(id, r)| subscription::ModifyMonitoredItem {
1968                            id: *id,
1969                            queue_size: r.revised_queue_size,
1970                            sampling_interval: r.revised_sampling_interval,
1971                        })
1972                        .collect::<Vec<subscription::ModifyMonitoredItem>>();
1973                    {
1974                        let mut subscription_state = trace_write_lock!(self.subscription_state);
1975                        subscription_state
1976                            .modify_monitored_items(subscription_id, &items_to_modify);
1977                    }
1978                }
1979                session_debug!(self, "modify_monitored_items, success");
1980                Ok(response.results.unwrap())
1981            } else {
1982                session_error!(self, "modify_monitored_items failed {:?}", response);
1983                Err(process_unexpected_response(response))
1984            }
1985        }
1986    }
1987
1988    fn set_monitoring_mode(
1989        &self,
1990        subscription_id: u32,
1991        monitoring_mode: MonitoringMode,
1992        monitored_item_ids: &[u32],
1993    ) -> Result<Vec<StatusCode>, StatusCode> {
1994        if monitored_item_ids.is_empty() {
1995            session_error!(self, "set_monitoring_mode, called with nothing to do");
1996            Err(StatusCode::BadNothingToDo)
1997        } else {
1998            let request = {
1999                let monitored_item_ids = Some(monitored_item_ids.to_vec());
2000                SetMonitoringModeRequest {
2001                    request_header: self.make_request_header(),
2002                    subscription_id,
2003                    monitoring_mode,
2004                    monitored_item_ids,
2005                }
2006            };
2007            let response = self.send_request(request)?;
2008            if let SupportedMessage::SetMonitoringModeResponse(response) = response {
2009                Ok(response.results.unwrap())
2010            } else {
2011                session_error!(self, "set_monitoring_mode failed {:?}", response);
2012                Err(process_unexpected_response(response))
2013            }
2014        }
2015    }
2016
2017    fn set_triggering(
2018        &self,
2019        subscription_id: u32,
2020        triggering_item_id: u32,
2021        links_to_add: &[u32],
2022        links_to_remove: &[u32],
2023    ) -> Result<(Option<Vec<StatusCode>>, Option<Vec<StatusCode>>), StatusCode> {
2024        if links_to_add.is_empty() && links_to_remove.is_empty() {
2025            session_error!(self, "set_triggering, called with nothing to add or remove");
2026            Err(StatusCode::BadNothingToDo)
2027        } else {
2028            let request = {
2029                let links_to_add = if links_to_add.is_empty() {
2030                    None
2031                } else {
2032                    Some(links_to_add.to_vec())
2033                };
2034                let links_to_remove = if links_to_remove.is_empty() {
2035                    None
2036                } else {
2037                    Some(links_to_remove.to_vec())
2038                };
2039                SetTriggeringRequest {
2040                    request_header: self.make_request_header(),
2041                    subscription_id,
2042                    triggering_item_id,
2043                    links_to_add,
2044                    links_to_remove,
2045                }
2046            };
2047            let response = self.send_request(request)?;
2048            if let SupportedMessage::SetTriggeringResponse(response) = response {
2049                // Update client side state
2050                let mut subscription_state = trace_write_lock!(self.subscription_state);
2051                subscription_state.set_triggering(
2052                    subscription_id,
2053                    triggering_item_id,
2054                    links_to_add,
2055                    links_to_remove,
2056                );
2057                Ok((response.add_results, response.remove_results))
2058            } else {
2059                session_error!(self, "set_triggering failed {:?}", response);
2060                Err(process_unexpected_response(response))
2061            }
2062        }
2063    }
2064
2065    fn delete_monitored_items(
2066        &self,
2067        subscription_id: u32,
2068        items_to_delete: &[u32],
2069    ) -> Result<Vec<StatusCode>, StatusCode> {
2070        session_debug!(
2071            self,
2072            "delete_monitored_items, subscription {} for {} items",
2073            subscription_id,
2074            items_to_delete.len()
2075        );
2076        if subscription_id == 0 {
2077            session_error!(self, "delete_monitored_items, subscription id 0 is invalid");
2078            Err(StatusCode::BadInvalidArgument)
2079        } else if !self.subscription_exists(subscription_id) {
2080            session_error!(
2081                self,
2082                "delete_monitored_items, subscription id {} does not exist",
2083                subscription_id
2084            );
2085            Err(StatusCode::BadInvalidArgument)
2086        } else if items_to_delete.is_empty() {
2087            session_error!(
2088                self,
2089                "delete_monitored_items, called with no items to delete"
2090            );
2091            Err(StatusCode::BadNothingToDo)
2092        } else {
2093            let request = DeleteMonitoredItemsRequest {
2094                request_header: self.make_request_header(),
2095                subscription_id,
2096                monitored_item_ids: Some(items_to_delete.to_vec()),
2097            };
2098            let response = self.send_request(request)?;
2099            if let SupportedMessage::DeleteMonitoredItemsResponse(response) = response {
2100                process_service_result(&response.response_header)?;
2101                if response.results.is_some() {
2102                    let mut subscription_state = trace_write_lock!(self.subscription_state);
2103                    subscription_state.delete_monitored_items(subscription_id, items_to_delete);
2104                }
2105                session_debug!(self, "delete_monitored_items, success");
2106                Ok(response.results.unwrap())
2107            } else {
2108                session_error!(self, "delete_monitored_items failed {:?}", response);
2109                Err(process_unexpected_response(response))
2110            }
2111        }
2112    }
2113}
2114
2115impl ViewService for Session {
2116    fn browse(
2117        &self,
2118        nodes_to_browse: &[BrowseDescription],
2119    ) -> Result<Option<Vec<BrowseResult>>, StatusCode> {
2120        if nodes_to_browse.is_empty() {
2121            session_error!(self, "browse, was not supplied with any nodes to browse");
2122            Err(StatusCode::BadNothingToDo)
2123        } else {
2124            let request = BrowseRequest {
2125                request_header: self.make_request_header(),
2126                view: ViewDescription {
2127                    view_id: NodeId::null(),
2128                    timestamp: DateTime::null(),
2129                    view_version: 0,
2130                },
2131                requested_max_references_per_node: 1000,
2132                nodes_to_browse: Some(nodes_to_browse.to_vec()),
2133            };
2134            let response = self.send_request(request)?;
2135            if let SupportedMessage::BrowseResponse(response) = response {
2136                session_debug!(self, "browse, success");
2137                process_service_result(&response.response_header)?;
2138                Ok(response.results)
2139            } else {
2140                session_error!(self, "browse failed {:?}", response);
2141                Err(process_unexpected_response(response))
2142            }
2143        }
2144    }
2145
2146    fn browse_next(
2147        &self,
2148        release_continuation_points: bool,
2149        continuation_points: &[ByteString],
2150    ) -> Result<Option<Vec<BrowseResult>>, StatusCode> {
2151        if continuation_points.is_empty() {
2152            Err(StatusCode::BadNothingToDo)
2153        } else {
2154            let request = BrowseNextRequest {
2155                request_header: self.make_request_header(),
2156                continuation_points: Some(continuation_points.to_vec()),
2157                release_continuation_points,
2158            };
2159            let response = self.send_request(request)?;
2160            if let SupportedMessage::BrowseNextResponse(response) = response {
2161                session_debug!(self, "browse_next, success");
2162                process_service_result(&response.response_header)?;
2163                Ok(response.results)
2164            } else {
2165                session_error!(self, "browse_next failed {:?}", response);
2166                Err(process_unexpected_response(response))
2167            }
2168        }
2169    }
2170
2171    fn translate_browse_paths_to_node_ids(
2172        &self,
2173        browse_paths: &[BrowsePath],
2174    ) -> Result<Vec<BrowsePathResult>, StatusCode> {
2175        if browse_paths.is_empty() {
2176            session_error!(
2177                self,
2178                "translate_browse_paths_to_node_ids, was not supplied with any browse paths"
2179            );
2180            Err(StatusCode::BadNothingToDo)
2181        } else {
2182            let request = TranslateBrowsePathsToNodeIdsRequest {
2183                request_header: self.make_request_header(),
2184                browse_paths: Some(browse_paths.to_vec()),
2185            };
2186            let response = self.send_request(request)?;
2187            if let SupportedMessage::TranslateBrowsePathsToNodeIdsResponse(response) = response {
2188                session_debug!(self, "translate_browse_paths_to_node_ids, success");
2189                process_service_result(&response.response_header)?;
2190                Ok(response.results.unwrap_or_default())
2191            } else {
2192                session_error!(
2193                    self,
2194                    "translate_browse_paths_to_node_ids failed {:?}",
2195                    response
2196                );
2197                Err(process_unexpected_response(response))
2198            }
2199        }
2200    }
2201
2202    fn register_nodes(&self, nodes_to_register: &[NodeId]) -> Result<Vec<NodeId>, StatusCode> {
2203        if nodes_to_register.is_empty() {
2204            session_error!(
2205                self,
2206                "register_nodes, was not supplied with any nodes to register"
2207            );
2208            Err(StatusCode::BadNothingToDo)
2209        } else {
2210            let request = RegisterNodesRequest {
2211                request_header: self.make_request_header(),
2212                nodes_to_register: Some(nodes_to_register.to_vec()),
2213            };
2214            let response = self.send_request(request)?;
2215            if let SupportedMessage::RegisterNodesResponse(response) = response {
2216                session_debug!(self, "register_nodes, success");
2217                process_service_result(&response.response_header)?;
2218                Ok(response.registered_node_ids.unwrap())
2219            } else {
2220                session_error!(self, "register_nodes failed {:?}", response);
2221                Err(process_unexpected_response(response))
2222            }
2223        }
2224    }
2225
2226    fn unregister_nodes(&self, nodes_to_unregister: &[NodeId]) -> Result<(), StatusCode> {
2227        if nodes_to_unregister.is_empty() {
2228            session_error!(
2229                self,
2230                "unregister_nodes, was not supplied with any nodes to unregister"
2231            );
2232            Err(StatusCode::BadNothingToDo)
2233        } else {
2234            let request = UnregisterNodesRequest {
2235                request_header: self.make_request_header(),
2236                nodes_to_unregister: Some(nodes_to_unregister.to_vec()),
2237            };
2238            let response = self.send_request(request)?;
2239            if let SupportedMessage::UnregisterNodesResponse(response) = response {
2240                session_debug!(self, "unregister_nodes, success");
2241                process_service_result(&response.response_header)?;
2242                Ok(())
2243            } else {
2244                session_error!(self, "unregister_nodes failed {:?}", response);
2245                Err(process_unexpected_response(response))
2246            }
2247        }
2248    }
2249}
2250
2251impl MethodService for Session {
2252    fn call<T>(&self, method: T) -> Result<CallMethodResult, StatusCode>
2253    where
2254        T: Into<CallMethodRequest>,
2255    {
2256        session_debug!(self, "call()");
2257        let methods_to_call = Some(vec![method.into()]);
2258        let request = CallRequest {
2259            request_header: self.make_request_header(),
2260            methods_to_call,
2261        };
2262        let response = self.send_request(request)?;
2263        if let SupportedMessage::CallResponse(response) = response {
2264            if let Some(mut results) = response.results {
2265                if results.len() != 1 {
2266                    session_error!(
2267                        self,
2268                        "call(), expecting a result from the call to the server, got {} results",
2269                        results.len()
2270                    );
2271                    Err(StatusCode::BadUnexpectedError)
2272                } else {
2273                    Ok(results.remove(0))
2274                }
2275            } else {
2276                session_error!(
2277                    self,
2278                    "call(), expecting a result from the call to the server, got nothing"
2279                );
2280                Err(StatusCode::BadUnexpectedError)
2281            }
2282        } else {
2283            Err(process_unexpected_response(response))
2284        }
2285    }
2286}
2287
2288impl AttributeService for Session {
2289    fn read(
2290        &self,
2291        nodes_to_read: &[ReadValueId],
2292        timestamps_to_return: TimestampsToReturn,
2293        max_age: f64,
2294    ) -> Result<Vec<DataValue>, StatusCode> {
2295        if nodes_to_read.is_empty() {
2296            // No subscriptions
2297            session_error!(self, "read(), was not supplied with any nodes to read");
2298            Err(StatusCode::BadNothingToDo)
2299        } else {
2300            session_debug!(self, "read() requested to read nodes {:?}", nodes_to_read);
2301            let request = ReadRequest {
2302                request_header: self.make_request_header(),
2303                max_age,
2304                timestamps_to_return,
2305                nodes_to_read: Some(nodes_to_read.to_vec()),
2306            };
2307            let response = self.send_request(request)?;
2308            if let SupportedMessage::ReadResponse(response) = response {
2309                session_debug!(self, "read(), success");
2310                process_service_result(&response.response_header)?;
2311                let results = if let Some(results) = response.results {
2312                    results
2313                } else {
2314                    Vec::new()
2315                };
2316                Ok(results)
2317            } else {
2318                session_error!(self, "read() value failed");
2319                Err(process_unexpected_response(response))
2320            }
2321        }
2322    }
2323
2324    fn history_read(
2325        &self,
2326        history_read_details: HistoryReadAction,
2327        timestamps_to_return: TimestampsToReturn,
2328        release_continuation_points: bool,
2329        nodes_to_read: &[HistoryReadValueId],
2330    ) -> Result<Vec<HistoryReadResult>, StatusCode> {
2331        // Turn the enum into an extension object
2332        let history_read_details = ExtensionObject::from(history_read_details);
2333        let request = HistoryReadRequest {
2334            request_header: self.make_request_header(),
2335            history_read_details,
2336            timestamps_to_return,
2337            release_continuation_points,
2338            nodes_to_read: if nodes_to_read.is_empty() {
2339                None
2340            } else {
2341                Some(nodes_to_read.to_vec())
2342            },
2343        };
2344        session_debug!(
2345            self,
2346            "history_read() requested to read nodes {:?}",
2347            nodes_to_read
2348        );
2349        let response = self.send_request(request)?;
2350        if let SupportedMessage::HistoryReadResponse(response) = response {
2351            session_debug!(self, "history_read(), success");
2352            process_service_result(&response.response_header)?;
2353            let results = if let Some(results) = response.results {
2354                results
2355            } else {
2356                Vec::new()
2357            };
2358            Ok(results)
2359        } else {
2360            session_error!(self, "history_read() value failed");
2361            Err(process_unexpected_response(response))
2362        }
2363    }
2364
2365    fn write(&self, nodes_to_write: &[WriteValue]) -> Result<Vec<StatusCode>, StatusCode> {
2366        if nodes_to_write.is_empty() {
2367            // No subscriptions
2368            session_error!(self, "write() was not supplied with any nodes to write");
2369            Err(StatusCode::BadNothingToDo)
2370        } else {
2371            let request = WriteRequest {
2372                request_header: self.make_request_header(),
2373                nodes_to_write: Some(nodes_to_write.to_vec()),
2374            };
2375            let response = self.send_request(request)?;
2376            if let SupportedMessage::WriteResponse(response) = response {
2377                session_debug!(self, "write(), success");
2378                process_service_result(&response.response_header)?;
2379                Ok(response.results.unwrap_or_default())
2380            } else {
2381                session_error!(self, "write() failed {:?}", response);
2382                Err(process_unexpected_response(response))
2383            }
2384        }
2385    }
2386
2387    fn history_update(
2388        &self,
2389        history_update_details: &[HistoryUpdateAction],
2390    ) -> Result<Vec<HistoryUpdateResult>, StatusCode> {
2391        if history_update_details.is_empty() {
2392            // No subscriptions
2393            session_error!(
2394                self,
2395                "history_update(), was not supplied with any detail to update"
2396            );
2397            Err(StatusCode::BadNothingToDo)
2398        } else {
2399            // Turn the enums into ExtensionObjects
2400            let history_update_details = history_update_details
2401                .iter()
2402                .map(|action| ExtensionObject::from(action))
2403                .collect::<Vec<ExtensionObject>>();
2404
2405            let request = HistoryUpdateRequest {
2406                request_header: self.make_request_header(),
2407                history_update_details: Some(history_update_details.to_vec()),
2408            };
2409            let response = self.send_request(request)?;
2410            if let SupportedMessage::HistoryUpdateResponse(response) = response {
2411                session_debug!(self, "history_update(), success");
2412                process_service_result(&response.response_header)?;
2413                let results = if let Some(results) = response.results {
2414                    results
2415                } else {
2416                    Vec::new()
2417                };
2418                Ok(results)
2419            } else {
2420                session_error!(self, "history_update() failed {:?}", response);
2421                Err(process_unexpected_response(response))
2422            }
2423        }
2424    }
2425}