use std::sync::Arc;
use crate::core::supported_message::SupportedMessage;
use crate::sync::*;
use crate::types::{status_code::StatusCode, *};
use crate::server::{
address_space::AddressSpace, services::Service, session::Session, state::ServerState,
subscriptions::subscription::Subscription,
};
pub(crate) struct SubscriptionService;
impl Service for SubscriptionService {
fn name(&self) -> String {
String::from("SubscriptionService")
}
}
impl SubscriptionService {
pub fn new() -> SubscriptionService {
SubscriptionService {}
}
pub fn create_subscription(
&self,
server_state: Arc<RwLock<ServerState>>,
session: Arc<RwLock<Session>>,
request: &CreateSubscriptionRequest,
) -> SupportedMessage {
let mut server_state = trace_write_lock!(server_state);
let mut session = trace_write_lock!(session);
let subscriptions = session.subscriptions_mut();
if server_state.max_subscriptions > 0
&& subscriptions.len() >= server_state.max_subscriptions
{
self.service_fault(&request.request_header, StatusCode::BadTooManySubscriptions)
} else {
let subscription_id = server_state.create_subscription_id();
let (revised_publishing_interval, revised_max_keep_alive_count, revised_lifetime_count) =
Self::revise_subscription_values(
&server_state,
request.requested_publishing_interval,
request.requested_max_keep_alive_count,
request.requested_lifetime_count,
);
let publishing_enabled = request.publishing_enabled;
let subscription = Subscription::new(
server_state.diagnostics.clone(),
subscription_id,
publishing_enabled,
revised_publishing_interval,
revised_lifetime_count,
revised_max_keep_alive_count,
request.priority,
);
subscriptions.insert(subscription_id, subscription);
CreateSubscriptionResponse {
response_header: ResponseHeader::new_good(&request.request_header),
subscription_id,
revised_publishing_interval,
revised_lifetime_count,
revised_max_keep_alive_count,
}
.into()
}
}
pub fn modify_subscription(
&self,
server_state: Arc<RwLock<ServerState>>,
session: Arc<RwLock<Session>>,
request: &ModifySubscriptionRequest,
) -> SupportedMessage {
let server_state = trace_write_lock!(server_state);
let mut session = trace_write_lock!(session);
let subscriptions = session.subscriptions_mut();
let subscription_id = request.subscription_id;
if !subscriptions.contains(subscription_id) {
self.service_fault(
&request.request_header,
StatusCode::BadSubscriptionIdInvalid,
)
} else {
let subscription = subscriptions.get_mut(subscription_id).unwrap();
let (revised_publishing_interval, revised_max_keep_alive_count, revised_lifetime_count) =
SubscriptionService::revise_subscription_values(
&server_state,
request.requested_publishing_interval,
request.requested_max_keep_alive_count,
request.requested_lifetime_count,
);
subscription.set_publishing_interval(revised_publishing_interval);
subscription.set_max_keep_alive_count(revised_max_keep_alive_count);
subscription.set_max_lifetime_count(revised_lifetime_count);
subscription.set_priority(request.priority);
subscription.reset_lifetime_counter();
subscription.reset_keep_alive_counter();
ModifySubscriptionResponse {
response_header: ResponseHeader::new_good(&request.request_header),
revised_publishing_interval,
revised_lifetime_count,
revised_max_keep_alive_count,
}
.into()
}
}
pub fn set_publishing_mode(
&self,
session: Arc<RwLock<Session>>,
request: &SetPublishingModeRequest,
) -> SupportedMessage {
if is_empty_option_vec!(request.subscription_ids) {
self.service_fault(&request.request_header, StatusCode::BadNothingToDo)
} else {
let mut session = trace_write_lock!(session);
let subscription_ids = request.subscription_ids.as_ref().unwrap();
let results = {
let publishing_enabled = request.publishing_enabled;
let mut results = Vec::with_capacity(subscription_ids.len());
let subscriptions = session.subscriptions_mut();
for subscription_id in subscription_ids {
if let Some(subscription) = subscriptions.get_mut(*subscription_id) {
subscription.set_publishing_enabled(publishing_enabled);
subscription.reset_lifetime_counter();
results.push(StatusCode::Good);
} else {
results.push(StatusCode::BadSubscriptionIdInvalid);
}
}
Some(results)
};
let diagnostic_infos = None;
SetPublishingModeResponse {
response_header: ResponseHeader::new_good(&request.request_header),
results,
diagnostic_infos,
}
.into()
}
}
pub fn transfer_subscriptions(
&self,
_session: Arc<RwLock<Session>>,
request: &TransferSubscriptionsRequest,
) -> SupportedMessage {
if is_empty_option_vec!(request.subscription_ids) {
self.service_fault(&request.request_header, StatusCode::BadNothingToDo)
} else {
let subscription_ids = request.subscription_ids.as_ref().unwrap();
let results = {
let results = subscription_ids
.iter()
.map(|_subscription_id| TransferResult {
status_code: StatusCode::BadSubscriptionIdInvalid,
available_sequence_numbers: None,
})
.collect::<Vec<TransferResult>>();
Some(results)
};
let diagnostic_infos = None;
TransferSubscriptionsResponse {
response_header: ResponseHeader::new_good(&request.request_header),
results,
diagnostic_infos,
}
.into()
}
}
pub fn delete_subscriptions(
&self,
session: Arc<RwLock<Session>>,
request: &DeleteSubscriptionsRequest,
) -> SupportedMessage {
if is_empty_option_vec!(request.subscription_ids) {
self.service_fault(&request.request_header, StatusCode::BadNothingToDo)
} else {
let mut session = trace_write_lock!(session);
let subscription_ids = request.subscription_ids.as_ref().unwrap();
let results = {
let subscriptions = session.subscriptions_mut();
let results = subscription_ids
.iter()
.map(|subscription_id| {
let subscription = subscriptions.remove(*subscription_id);
if subscription.is_some() {
StatusCode::Good
} else {
StatusCode::BadSubscriptionIdInvalid
}
})
.collect::<Vec<StatusCode>>();
Some(results)
};
let diagnostic_infos = None;
DeleteSubscriptionsResponse {
response_header: ResponseHeader::new_good(&request.request_header),
results,
diagnostic_infos,
}
.into()
}
}
pub fn async_publish(
&self,
now: &DateTimeUtc,
session: Arc<RwLock<Session>>,
address_space: Arc<RwLock<AddressSpace>>,
request_id: u32,
request: &PublishRequest,
) -> Option<SupportedMessage> {
trace!("--> Receive a PublishRequest {:?}", request);
let mut session = trace_write_lock!(session);
if session.subscriptions().is_empty() {
Some(self.service_fault(&request.request_header, StatusCode::BadNoSubscription))
} else {
let address_space = trace_read_lock!(address_space);
let request_header = request.request_header.clone();
let result =
session.enqueue_publish_request(now, request_id, request.clone(), &address_space);
if let Err(error) = result {
Some(self.service_fault(&request_header, error))
} else {
None
}
}
}
pub fn republish(
&self,
session: Arc<RwLock<Session>>,
request: &RepublishRequest,
) -> SupportedMessage {
trace!("Republish {:?}", request);
let mut session = trace_write_lock!(session);
let result = session
.subscriptions()
.find_notification_message(request.subscription_id, request.retransmit_sequence_number);
if let Ok(notification_message) = result {
session.reset_subscription_lifetime_counter(request.subscription_id);
let response = RepublishResponse {
response_header: ResponseHeader::new_good(&request.request_header),
notification_message,
};
response.into()
} else {
self.service_fault(&request.request_header, result.unwrap_err())
}
}
fn revise_subscription_values(
server_state: &ServerState,
requested_publishing_interval: Duration,
requested_max_keep_alive_count: u32,
requested_lifetime_count: u32,
) -> (Duration, u32, u32) {
let revised_publishing_interval = f64::max(
requested_publishing_interval,
server_state.min_publishing_interval_ms,
);
let revised_max_keep_alive_count =
if requested_max_keep_alive_count > server_state.max_keep_alive_count {
server_state.max_keep_alive_count
} else if requested_max_keep_alive_count == 0 {
server_state.default_keep_alive_count
} else {
requested_max_keep_alive_count
};
let min_lifetime_count = revised_max_keep_alive_count * 3;
let revised_lifetime_count = if requested_lifetime_count < min_lifetime_count {
min_lifetime_count
} else if requested_lifetime_count > server_state.max_lifetime_count {
server_state.max_lifetime_count
} else {
requested_lifetime_count
};
(
revised_publishing_interval,
revised_max_keep_alive_count,
revised_lifetime_count,
)
}
}