use crate::common::{ApiConfig, ExecutionEnvironment, LogType};
use crate::rest_api::RestApi;
use lightstreamer_client::ls_client::{LightstreamerClient, SubscriptionRequest, Transport};
use lightstreamer_client::subscription::Subscription;
use signal_hook::low_level::signal_name;
use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals};
use std::error::Error;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::Notify;
use tracing::{debug, error, info, trace, warn, Level};
const MAX_CONNECTION_ATTEMPTS: u64 = 1;
pub struct StreamingApi {
ls_client: LightstreamerClient,
max_connection_attempts: u64,
pub subscription_sender: Sender<SubscriptionRequest>,
log_type: LogType,
}
impl StreamingApi {
pub async fn connect(&mut self) {
let shutdown_signal = Arc::new(tokio::sync::Notify::new());
StreamingApi::setup_signal_hook(Arc::clone(&shutdown_signal), self.log_type.clone()).await;
let mut retry_interval_milis: u64 = 0;
let mut retry_counter: u64 = 0;
while retry_counter < self.max_connection_attempts {
match self.ls_client.connect(Arc::clone(&shutdown_signal)).await {
Ok(_) => {
self.ls_client.disconnect().await;
break;
}
Err(e) => {
self.make_log(Level::ERROR, &format!("Failed to connect: {:?}", e));
tokio::time::sleep(std::time::Duration::from_millis(retry_interval_milis)).await;
retry_interval_milis = (retry_interval_milis + (200 * retry_counter)).min(5000);
retry_counter += 1;
self.make_log(Level::INFO, &format!("Retrying connection in {:.2} seconds...", retry_interval_milis as f64 / 1000.0));
}
}
}
if retry_counter == self.max_connection_attempts {
self.make_log(
Level::ERROR,
&format!("Failed to connect after {} retries. Exiting...", retry_counter)
);
} else {
self.make_log(Level::INFO, "Exiting orderly from Lightstreamer client...");
}
}
pub async fn new(subscriptions: Vec<Subscription>, config: Option<ApiConfig>) -> Result<Self, Box<dyn Error>> {
let api_config = config.unwrap_or_else(|| ApiConfig::default());
let auto_login = api_config.auto_login.unwrap_or(false);
let max_connection_attempts = api_config.streaming_api_max_connection_attempts.unwrap_or(MAX_CONNECTION_ATTEMPTS);
let api_log_type = api_config.logger.clone();
let ls_client_log_type = match api_log_type {
LogType::StdLogs => lightstreamer_client::ls_client::LogType::StdLogs,
LogType::TracingLogs => lightstreamer_client::ls_client::LogType::TracingLogs,
};
let mut rest_api = match RestApi::new(api_config).await {
Ok(api) => api,
Err(e) => {
return Err(Box::<dyn Error>::from(format!(
"Failed to create and initialize REST API: {}",
e
)));
}
};
if !auto_login {
let _ = rest_api.client.login();
}
let (cst, x_security_token) = match StreamingApi::get_tokens(&rest_api) {
Ok(tokens) => tokens,
Err(e) => {
return Err(Box::<dyn Error>::from(format!(
"Failed to get CST and X-SECURITY-TOKEN from REST API: {}",
e
)));
}
};
let mut ls_client = LightstreamerClient::new(
Some(&format!(
"{}/lightstreamer",
&rest_api.client.lightstreamer_endpoint
)),
None,
match rest_api.config.execution_environment {
ExecutionEnvironment::Demo => Some(&rest_api.config.account_number_demo),
ExecutionEnvironment::Live => Some(&rest_api.config.account_number_live),
},
Some(&format!("CST-{}|XST-{}", cst.to_string(), x_security_token)),
)?;
for subscription in subscriptions {
LightstreamerClient::subscribe(ls_client.subscription_sender.clone(), subscription);
}
ls_client
.connection_options
.set_forced_transport(Some(Transport::WsStreaming));
ls_client.set_logging_type(ls_client_log_type);
let subscription_sender = ls_client.subscription_sender.clone();
Ok(Self {
ls_client,
max_connection_attempts,
subscription_sender,
log_type: api_log_type,
})
}
fn get_tokens(rest_api: &RestApi) -> Result<(String, String), Box<dyn Error>> {
let auth_headers = match rest_api.client.auth_headers {
Some(ref headers) => headers,
None => {
return Err(Box::<dyn Error>::from(
"Client not authenticated, auth headers not found.",
));
}
};
let cst = match auth_headers.get("cst") {
Some(cst) => match cst.to_str() {
Ok(cst) => cst.to_string(),
Err(_) => {
return Err(Box::<dyn Error>::from(
"Client not authenticated, CST auth header not found.",
));
}
},
None => {
return Err(Box::<dyn Error>::from(
"Client not authenticated, CST auth header not found.",
));
}
};
let x_security_token = match auth_headers.get("x-security-token") {
Some(x_security_token) => match x_security_token.to_str() {
Ok(x_security_token) => x_security_token.to_string(),
Err(_) => {
return Err(Box::<dyn Error>::from(
"Client not authenticated, X-SECURITY-TOKEN auth header not found.",
));
}
},
None => {
return Err(Box::<dyn Error>::from(
"Client not authenticated, X-SECURITY-TOKEN auth header not found.",
));
}
};
Ok((cst, x_security_token))
}
async fn setup_signal_hook(shutdown_signal: Arc<Notify>, log_type: LogType) {
let signals = &[SIGINT, SIGTERM];
let mut signals_iterator = Signals::new(signals).expect("Failed to create signal iterator");
tokio::spawn(async move {
for signal in signals_iterator.forever() {
Self::log_msg(&log_type, Level::INFO, &format!("Received signal: {}", signal_name(signal).unwrap()));
let _ = shutdown_signal.notify_one();
break;
}
});
}
pub fn subscribe(subscription_sender: Sender<SubscriptionRequest> , subscription: Subscription) {
LightstreamerClient::subscribe(subscription_sender, subscription);
}
pub async fn subscribe_get_id(subscription_sender: Sender<SubscriptionRequest> , subscription: Subscription) -> Result<usize, Box<dyn Error + Send + Sync>> {
LightstreamerClient::subscribe_get_id(subscription_sender, subscription).await
}
pub fn unsubscribe(subscription_sender: Sender<SubscriptionRequest> , subscription_id: usize) {
LightstreamerClient::unsubscribe(subscription_sender, subscription_id);
}
pub fn make_log(&mut self, loglevel: Level, log: &str) {
Self::log_msg(&self.log_type, loglevel, log);
}
pub fn log_msg(log_type: &LogType, loglevel: Level, log: &str) {
match log_type {
LogType::StdLogs => {
println!("{}", log);
}
LogType::TracingLogs => match loglevel {
Level::INFO => {
info!(log);
}
Level::WARN => {
warn!(log);
}
Level::ERROR => {
error!(log);
}
Level::TRACE => {
trace!(log);
}
Level::DEBUG => {
debug!(log);
}
},
}
}
}