use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use arc_swap::ArcSwap;
use tracing::error;
use super::continuation_points::ContinuationPoint;
use super::manager::next_session_id;
use crate::authenticator::UserToken;
use crate::identity_token::IdentityToken;
use crate::info::ServerInfo;
use crate::node_manager::{BrowseContinuationPoint, QueryContinuationPoint};
use opcua_crypto::X509;
use opcua_types::{
ApplicationDescription, ByteString, MessageSecurityMode, NodeId, StatusCode, UAString,
};
pub struct Session {
session_id: NodeId,
session_id_numeric: u32,
security_policy_uri: String,
secure_channel_id: u32,
client_certificate: Option<X509>,
pub(super) authentication_token: NodeId,
session_nonce: ByteString,
session_name: UAString,
session_timeout: Duration,
user_identity: IdentityToken,
locale_ids: Option<Vec<UAString>>,
max_request_message_size: u32,
max_response_message_size: u32,
endpoint_url: UAString,
max_browse_continuation_points: usize,
max_history_continuation_points: usize,
max_query_continuation_points: usize,
application_description: ApplicationDescription,
message_security_mode: MessageSecurityMode,
last_service_request: ArcSwap<Instant>,
browse_continuation_points: HashMap<ByteString, BrowseContinuationPoint>,
history_continuation_points: HashMap<ByteString, ContinuationPoint>,
query_continuation_points: HashMap<ByteString, QueryContinuationPoint>,
user_token: Option<UserToken>,
is_closed: bool,
}
impl Session {
#[allow(clippy::too_many_arguments)]
pub(crate) fn create(
info: &ServerInfo,
authentication_token: NodeId,
secure_channel_id: u32,
session_timeout: u64,
max_request_message_size: u32,
max_response_message_size: u32,
endpoint_url: UAString,
security_policy_uri: String,
user_identity: IdentityToken,
client_certificate: Option<X509>,
session_nonce: ByteString,
session_name: UAString,
application_description: ApplicationDescription,
message_security_mode: MessageSecurityMode,
) -> Self {
let (session_id, session_id_numeric) = next_session_id();
Self {
session_id,
session_id_numeric,
security_policy_uri,
secure_channel_id,
client_certificate,
authentication_token,
session_nonce,
session_name,
session_timeout: if session_timeout == 0 {
Duration::from_millis(info.config.max_session_timeout_ms)
} else {
Duration::from_millis(session_timeout)
},
last_service_request: ArcSwap::new(Arc::new(Instant::now())),
user_identity,
locale_ids: None,
max_request_message_size,
max_response_message_size,
endpoint_url,
max_browse_continuation_points: info.config.limits.max_browse_continuation_points,
max_history_continuation_points: info.config.limits.max_history_continuation_points,
max_query_continuation_points: info.config.limits.max_query_continuation_points,
browse_continuation_points: Default::default(),
history_continuation_points: Default::default(),
query_continuation_points: Default::default(),
user_token: None,
application_description,
message_security_mode,
is_closed: false,
}
}
pub(crate) fn validate_timed_out(&self) -> Result<(), StatusCode> {
let elapsed = Instant::now() - **self.last_service_request.load();
self.last_service_request.store(Arc::new(Instant::now()));
if self.session_timeout < elapsed {
error!("Session has timed out because too much time has elapsed between service calls - elapsed time = {}ms", elapsed.as_millis());
Err(StatusCode::BadSessionIdInvalid)
} else {
Ok(())
}
}
pub fn deadline(&self) -> Instant {
**self.last_service_request.load() + self.session_timeout
}
pub(crate) fn validate_activated(&self) -> Result<&UserToken, StatusCode> {
if self.is_closed {
return Err(StatusCode::BadSessionClosed);
}
if let Some(token) = &self.user_token {
Ok(token)
} else {
Err(StatusCode::BadSessionNotActivated)
}
}
pub(crate) fn validate_secure_channel_id(
&self,
secure_channel_id: u32,
) -> Result<(), StatusCode> {
if secure_channel_id != self.secure_channel_id {
Err(StatusCode::BadSecureChannelIdInvalid)
} else {
Ok(())
}
}
pub(crate) fn activate(
&mut self,
secure_channel_id: u32,
server_nonce: ByteString,
identity: IdentityToken,
locale_ids: Option<Vec<UAString>>,
user_token: UserToken,
) {
self.user_token = Some(user_token);
self.secure_channel_id = secure_channel_id;
self.session_nonce = server_nonce;
self.user_identity = identity;
self.locale_ids = locale_ids;
}
pub(crate) fn close(&mut self) {
self.is_closed = true;
}
pub fn session_id(&self) -> &NodeId {
&self.session_id
}
pub fn endpoint_url(&self) -> &UAString {
&self.endpoint_url
}
pub fn client_certificate(&self) -> Option<&X509> {
self.client_certificate.as_ref()
}
pub fn session_nonce(&self) -> &ByteString {
&self.session_nonce
}
pub fn is_activated(&self) -> bool {
self.user_token.is_some() && !self.is_closed
}
pub fn secure_channel_id(&self) -> u32 {
self.secure_channel_id
}
pub(crate) fn add_browse_continuation_point(
&mut self,
cp: BrowseContinuationPoint,
) -> Result<(), ()> {
if self.max_browse_continuation_points <= self.browse_continuation_points.len()
&& self.max_browse_continuation_points > 0
{
Err(())
} else {
self.browse_continuation_points.insert(cp.id.clone(), cp);
Ok(())
}
}
pub(crate) fn remove_browse_continuation_point(
&mut self,
id: &ByteString,
) -> Option<BrowseContinuationPoint> {
self.browse_continuation_points.remove(id)
}
pub(crate) fn add_history_continuation_point(
&mut self,
id: &ByteString,
cp: ContinuationPoint,
) -> Result<(), ()> {
if self.max_history_continuation_points <= self.history_continuation_points.len()
&& self.max_history_continuation_points > 0
{
Err(())
} else {
self.history_continuation_points.insert(id.clone(), cp);
Ok(())
}
}
pub(crate) fn remove_history_continuation_point(
&mut self,
id: &ByteString,
) -> Option<ContinuationPoint> {
self.history_continuation_points.remove(id)
}
pub(crate) fn add_query_continuation_point(
&mut self,
id: &ByteString,
cp: QueryContinuationPoint,
) -> Result<(), ()> {
if self.max_query_continuation_points <= self.query_continuation_points.len()
&& self.max_query_continuation_points > 0
{
Err(())
} else {
self.query_continuation_points.insert(id.clone(), cp);
Ok(())
}
}
pub(crate) fn remove_query_continuation_point(
&mut self,
id: &ByteString,
) -> Option<QueryContinuationPoint> {
self.query_continuation_points.remove(id)
}
pub fn application_description(&self) -> &ApplicationDescription {
&self.application_description
}
pub fn user_token(&self) -> Option<&UserToken> {
self.user_token.as_ref()
}
pub fn message_security_mode(&self) -> MessageSecurityMode {
self.message_security_mode
}
pub fn session_id_numeric(&self) -> u32 {
self.session_id_numeric
}
pub fn max_request_message_size(&self) -> u32 {
self.max_request_message_size
}
pub fn max_response_message_size(&self) -> u32 {
self.max_response_message_size
}
pub fn session_name(&self) -> &str {
self.session_name.as_ref()
}
pub fn security_policy_uri(&self) -> &str {
&self.security_policy_uri
}
}