use std::sync::Arc;
use tokio::{pin, select};
use tracing::info;
use crate::transport::{Connector, SecureChannelEventLoop, TransportPollResult};
use opcua_types::{NodeId, StatusCode};
use super::Session;
pub(super) struct SessionConnector {
inner: Arc<Session>,
}
#[derive(Debug, Clone)]
pub enum SessionConnectMode {
NewSession(NodeId),
ReactivatedSession(NodeId),
}
impl SessionConnector {
pub(super) fn new(session: Arc<Session>) -> Self {
Self { inner: session }
}
pub(super) async fn try_connect<T: Connector>(
&self,
connector: &T,
) -> Result<(SecureChannelEventLoop<T::Transport>, SessionConnectMode), StatusCode> {
self.connect_and_activate(connector).await
}
async fn connect_and_activate<T: Connector>(
&self,
connector: &T,
) -> Result<(SecureChannelEventLoop<T::Transport>, SessionConnectMode), StatusCode> {
let mut event_loop = self.inner.channel.connect_no_retry(connector).await?;
let activate_fut = self.ensure_and_activate_session();
pin!(activate_fut);
let res = loop {
select! {
r = event_loop.poll() => {
if let TransportPollResult::Closed(c) = r {
return Err(c);
}
},
r = &mut activate_fut => break r,
}
};
let id = match res {
Ok(id) => id,
Err(e) => {
self.inner.channel.close_channel().await;
loop {
if matches!(event_loop.poll().await, TransportPollResult::Closed(_)) {
break;
}
}
return Err(e);
}
};
Ok((event_loop, id))
}
async fn ensure_and_activate_session(&self) -> Result<SessionConnectMode, StatusCode> {
let should_create_session = self.inner.session_id.load().is_null();
if should_create_session {
self.inner.create_session().await?;
}
let reconnect = match self.inner.activate_session().await {
Err(status_code) if !should_create_session => {
info!(
"Session activation failed on reconnect, error = {}, creating a new session",
status_code
);
self.inner.reset();
let id = self.inner.create_session().await?;
self.inner.activate_session().await?;
SessionConnectMode::NewSession(id)
}
Err(e) => return Err(e),
Ok(_) => {
let session_id = (**self.inner.session_id.load()).clone();
if should_create_session {
SessionConnectMode::NewSession(session_id)
} else {
SessionConnectMode::ReactivatedSession(session_id)
}
}
};
if self.inner.recreate_subscriptions {
self.inner.transfer_subscriptions_from_old_session().await;
}
Ok(reconnect)
}
}