use crate::client::{LightstreamerClient, Transport};
use crate::subscription::{
ChannelSubscriptionListener, ItemUpdate, Snapshot, Subscription, SubscriptionMode,
};
use crate::utils::LightstreamerError;
use std::sync::Arc;
use tokio::sync::{Mutex, Notify, mpsc};
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub server_address: String,
pub adapter_set: Option<String>,
pub username: Option<String>,
pub password: Option<String>,
pub transport: Option<Transport>,
pub keepalive_interval: Option<u64>,
pub idle_timeout: Option<u64>,
pub reconnect_timeout: Option<u64>,
}
impl ClientConfig {
#[must_use]
pub fn new(server_address: impl Into<String>) -> Self {
Self {
server_address: server_address.into(),
adapter_set: None,
username: None,
password: None,
transport: Some(Transport::WsStreaming),
keepalive_interval: None,
idle_timeout: None,
reconnect_timeout: None,
}
}
#[must_use]
pub fn adapter_set(mut self, adapter_set: impl Into<String>) -> Self {
self.adapter_set = Some(adapter_set.into());
self
}
#[must_use]
pub fn username(mut self, username: impl Into<String>) -> Self {
self.username = Some(username.into());
self
}
#[must_use]
pub fn password(mut self, password: impl Into<String>) -> Self {
self.password = Some(password.into());
self
}
#[must_use]
pub fn transport(mut self, transport: Transport) -> Self {
self.transport = Some(transport);
self
}
#[must_use]
pub fn keepalive_interval(mut self, interval: u64) -> Self {
self.keepalive_interval = Some(interval);
self
}
#[must_use]
pub fn idle_timeout(mut self, timeout: u64) -> Self {
self.idle_timeout = Some(timeout);
self
}
#[must_use]
pub fn reconnect_timeout(mut self, timeout: u64) -> Self {
self.reconnect_timeout = Some(timeout);
self
}
}
#[derive(Debug, Clone)]
pub struct SubscriptionParams {
pub mode: SubscriptionMode,
pub items: Vec<String>,
pub fields: Vec<String>,
pub data_adapter: Option<String>,
pub snapshot: Option<Snapshot>,
}
impl SubscriptionParams {
#[must_use]
pub fn new(mode: SubscriptionMode, items: Vec<String>, fields: Vec<String>) -> Self {
Self {
mode,
items,
fields,
data_adapter: None,
snapshot: Some(Snapshot::Yes),
}
}
#[must_use]
pub fn data_adapter(mut self, adapter: impl Into<String>) -> Self {
self.data_adapter = Some(adapter.into());
self
}
#[must_use]
pub fn snapshot(mut self, snapshot: Snapshot) -> Self {
self.snapshot = Some(snapshot);
self
}
}
#[derive(Clone)]
pub struct SimpleClient {
client: Arc<Mutex<LightstreamerClient>>,
shutdown_signal: Arc<Notify>,
}
impl SimpleClient {
pub fn new(config: ClientConfig) -> Result<Self, LightstreamerError> {
let mut client = LightstreamerClient::new(
Some(&config.server_address),
config.adapter_set.as_deref(),
config.username.as_deref(),
config.password.as_deref(),
)?;
if let Some(transport) = config.transport {
client
.connection_options
.set_forced_transport(Some(transport));
}
if let Some(interval) = config.keepalive_interval {
client
.connection_options
.set_keepalive_interval(interval)
.map_err(|e| format!("Failed to set keepalive interval: {}", e))?;
}
if let Some(timeout) = config.idle_timeout {
client
.connection_options
.set_idle_timeout(timeout)
.map_err(|e| format!("Failed to set idle timeout: {}", e))?;
}
if let Some(timeout) = config.reconnect_timeout {
client
.connection_options
.set_reconnect_timeout(timeout)
.map_err(|e| format!("Failed to set reconnect timeout: {}", e))?;
}
Ok(Self {
client: Arc::new(Mutex::new(client)),
shutdown_signal: Arc::new(Notify::new()),
})
}
pub async fn subscribe(
&self,
params: SubscriptionParams,
) -> Result<mpsc::UnboundedReceiver<ItemUpdate>, LightstreamerError> {
let mut subscription =
Subscription::new(params.mode, Some(params.items), Some(params.fields))?;
if let Some(adapter) = params.data_adapter {
subscription.set_data_adapter(Some(adapter))?;
}
if let Some(snapshot) = params.snapshot {
subscription.set_requested_snapshot(Some(snapshot))?;
}
let (listener, receiver) = ChannelSubscriptionListener::create_channel();
subscription.add_listener(Box::new(listener));
let client_guard = self.client.lock().await;
LightstreamerClient::subscribe(client_guard.subscription_sender.clone(), subscription)
.await?;
Ok(receiver)
}
pub async fn connect(&self) -> Result<(), LightstreamerError> {
LightstreamerClient::connect(self.client.clone(), self.shutdown_signal.clone()).await
}
pub async fn disconnect(&self) {
let mut client_guard = self.client.lock().await;
client_guard.disconnect().await;
}
#[must_use]
pub fn shutdown_signal(&self) -> Arc<Notify> {
self.shutdown_signal.clone()
}
pub fn shutdown(&self) {
self.shutdown_signal.notify_waiters();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_client_config_builder() {
let config = ClientConfig::new("http://localhost:8080")
.adapter_set("DEMO")
.username("user")
.password("pass")
.transport(Transport::WsStreaming)
.keepalive_interval(5000)
.idle_timeout(120000)
.reconnect_timeout(3000);
assert_eq!(config.server_address, "http://localhost:8080");
assert_eq!(config.adapter_set, Some("DEMO".to_string()));
assert_eq!(config.username, Some("user".to_string()));
assert_eq!(config.password, Some("pass".to_string()));
assert_eq!(config.transport, Some(Transport::WsStreaming));
assert_eq!(config.keepalive_interval, Some(5000));
assert_eq!(config.idle_timeout, Some(120000));
assert_eq!(config.reconnect_timeout, Some(3000));
}
#[test]
fn test_subscription_params_builder() {
let params = SubscriptionParams::new(
SubscriptionMode::Merge,
vec!["item1".to_string()],
vec!["field1".to_string()],
)
.data_adapter("QUOTE_ADAPTER")
.snapshot(Snapshot::Yes);
assert_eq!(params.mode, SubscriptionMode::Merge);
assert_eq!(params.items, vec!["item1".to_string()]);
assert_eq!(params.fields, vec!["field1".to_string()]);
assert_eq!(params.data_adapter, Some("QUOTE_ADAPTER".to_string()));
assert!(matches!(params.snapshot, Some(Snapshot::Yes)));
}
}