Skip to main content

opcua_client/session/
connect.rs

1use std::sync::Arc;
2
3use tokio::{pin, select};
4use tracing::info;
5
6use crate::transport::{Connector, SecureChannelEventLoop, TransportPollResult};
7use opcua_types::{NodeId, StatusCode};
8
9use super::Session;
10
11/// This struct manages the task of connecting to the server.
12/// It will only make a single attempt, so whatever is calling it is responsible for retries.
13pub(super) struct SessionConnector {
14    inner: Arc<Session>,
15}
16
17/// When the session connects to the server, this describes
18/// how that happened, whether a new session was created, or an old session was reactivated.
19#[derive(Debug, Clone)]
20pub enum SessionConnectMode {
21    /// A new session was created with session ID given by the inner [`NodeId`]
22    NewSession(NodeId),
23    /// An old session was reactivated with session ID given by the inner [`NodeId`]
24    ReactivatedSession(NodeId),
25}
26
27impl SessionConnector {
28    pub(super) fn new(session: Arc<Session>) -> Self {
29        Self { inner: session }
30    }
31
32    pub(super) async fn try_connect<T: Connector>(
33        &self,
34        connector: &T,
35    ) -> Result<(SecureChannelEventLoop<T::Transport>, SessionConnectMode), StatusCode> {
36        self.connect_and_activate(connector).await
37    }
38
39    async fn connect_and_activate<T: Connector>(
40        &self,
41        connector: &T,
42    ) -> Result<(SecureChannelEventLoop<T::Transport>, SessionConnectMode), StatusCode> {
43        let mut event_loop = self.inner.channel.connect_no_retry(connector).await?;
44
45        let activate_fut = self.ensure_and_activate_session();
46        pin!(activate_fut);
47
48        let res = loop {
49            select! {
50                r = event_loop.poll() => {
51                    if let TransportPollResult::Closed(c) = r {
52                        return Err(c);
53                    }
54                },
55                r = &mut activate_fut => break r,
56            }
57        };
58
59        let id = match res {
60            Ok(id) => id,
61            Err(e) => {
62                self.inner.channel.close_channel().await;
63
64                loop {
65                    if matches!(event_loop.poll().await, TransportPollResult::Closed(_)) {
66                        break;
67                    }
68                }
69
70                return Err(e);
71            }
72        };
73
74        Ok((event_loop, id))
75    }
76
77    async fn ensure_and_activate_session(&self) -> Result<SessionConnectMode, StatusCode> {
78        let should_create_session = self.inner.session_id.load().is_null();
79
80        if should_create_session {
81            self.inner.create_session().await?;
82        }
83
84        let reconnect = match self.inner.activate_session().await {
85            Err(status_code) if !should_create_session => {
86                info!(
87                    "Session activation failed on reconnect, error = {}, creating a new session",
88                    status_code
89                );
90                self.inner.reset();
91                let id = self.inner.create_session().await?;
92                self.inner.activate_session().await?;
93                SessionConnectMode::NewSession(id)
94            }
95            Err(e) => return Err(e),
96            Ok(_) => {
97                let session_id = (**self.inner.session_id.load()).clone();
98                if should_create_session {
99                    SessionConnectMode::NewSession(session_id)
100                } else {
101                    SessionConnectMode::ReactivatedSession(session_id)
102                }
103            }
104        };
105
106        if self.inner.recreate_subscriptions {
107            self.inner.transfer_subscriptions_from_old_session().await;
108        }
109
110        Ok(reconnect)
111    }
112}