opcua_client/session/
connect.rs1use std::sync::Arc;
2
3use tokio::{pin, select};
4use tracing::info;
5
6use crate::transport::{Connector, SecureChannelEventLoop, TransportPollResult};
7use opcua_types::{NodeId, StatusCode};
8
9use super::Session;
10
11pub(super) struct SessionConnector {
14 inner: Arc<Session>,
15}
16
17#[derive(Debug, Clone)]
20pub enum SessionConnectMode {
21 NewSession(NodeId),
23 ReactivatedSession(NodeId),
25}
26
27impl SessionConnector {
28 pub(super) fn new(session: Arc<Session>) -> Self {
29 Self { inner: session }
30 }
31
32 pub(super) async fn try_connect<T: Connector>(
33 &self,
34 connector: &T,
35 ) -> Result<(SecureChannelEventLoop<T::Transport>, SessionConnectMode), StatusCode> {
36 self.connect_and_activate(connector).await
37 }
38
39 async fn connect_and_activate<T: Connector>(
40 &self,
41 connector: &T,
42 ) -> Result<(SecureChannelEventLoop<T::Transport>, SessionConnectMode), StatusCode> {
43 let mut event_loop = self.inner.channel.connect_no_retry(connector).await?;
44
45 let activate_fut = self.ensure_and_activate_session();
46 pin!(activate_fut);
47
48 let res = loop {
49 select! {
50 r = event_loop.poll() => {
51 if let TransportPollResult::Closed(c) = r {
52 return Err(c);
53 }
54 },
55 r = &mut activate_fut => break r,
56 }
57 };
58
59 let id = match res {
60 Ok(id) => id,
61 Err(e) => {
62 self.inner.channel.close_channel().await;
63
64 loop {
65 if matches!(event_loop.poll().await, TransportPollResult::Closed(_)) {
66 break;
67 }
68 }
69
70 return Err(e);
71 }
72 };
73
74 Ok((event_loop, id))
75 }
76
77 async fn ensure_and_activate_session(&self) -> Result<SessionConnectMode, StatusCode> {
78 let should_create_session = self.inner.session_id.load().is_null();
79
80 if should_create_session {
81 self.inner.create_session().await?;
82 }
83
84 let reconnect = match self.inner.activate_session().await {
85 Err(status_code) if !should_create_session => {
86 info!(
87 "Session activation failed on reconnect, error = {}, creating a new session",
88 status_code
89 );
90 self.inner.reset();
91 let id = self.inner.create_session().await?;
92 self.inner.activate_session().await?;
93 SessionConnectMode::NewSession(id)
94 }
95 Err(e) => return Err(e),
96 Ok(_) => {
97 let session_id = (**self.inner.session_id.load()).clone();
98 if should_create_session {
99 SessionConnectMode::NewSession(session_id)
100 } else {
101 SessionConnectMode::ReactivatedSession(session_id)
102 }
103 }
104 };
105
106 if self.inner.recreate_subscriptions {
107 self.inner.transfer_subscriptions_from_old_session().await;
108 }
109
110 Ok(reconnect)
111 }
112}