use crate::application::client::StreamerClient;
use crate::error::AppError;
use crate::model::streaming::StreamingMarketField;
use crate::presentation::price::PriceData;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::{Notify, RwLock, mpsc};
use tracing::{debug, info, warn};
pub struct DynamicMarketStreamer {
client: Arc<RwLock<Option<StreamerClient>>>,
epics: Arc<RwLock<HashSet<String>>>,
fields: HashSet<StreamingMarketField>,
price_tx: Arc<RwLock<Option<mpsc::UnboundedSender<PriceData>>>>,
price_rx: Arc<RwLock<Option<mpsc::UnboundedReceiver<PriceData>>>>,
is_connected: Arc<RwLock<bool>>,
shutdown_signal: Arc<RwLock<Option<Arc<Notify>>>>,
}
impl DynamicMarketStreamer {
pub async fn new(fields: HashSet<StreamingMarketField>) -> Result<Self, AppError> {
let (price_tx, price_rx) = mpsc::unbounded_channel();
Ok(Self {
client: Arc::new(RwLock::new(None)),
epics: Arc::new(RwLock::new(HashSet::new())),
fields,
price_tx: Arc::new(RwLock::new(Some(price_tx))),
price_rx: Arc::new(RwLock::new(Some(price_rx))),
is_connected: Arc::new(RwLock::new(false)),
shutdown_signal: Arc::new(RwLock::new(None)),
})
}
pub async fn add(&self, epic: String) -> Result<(), AppError> {
let mut epics = self.epics.write().await;
if epics.contains(&epic) {
debug!("EPIC {} already subscribed", epic);
return Ok(());
}
epics.insert(epic.clone());
info!("Added EPIC {} to subscription list", epic);
drop(epics);
let is_connected = *self.is_connected.read().await;
if is_connected {
self.reconnect().await?;
}
Ok(())
}
pub async fn remove(&self, epic: String) -> Result<(), AppError> {
let mut epics = self.epics.write().await;
let was_removed = epics.remove(&epic);
if was_removed {
info!("Removed EPIC {} from subscription list", epic);
} else {
debug!("EPIC {} was not in subscription list", epic);
}
drop(epics);
if was_removed {
let is_connected = *self.is_connected.read().await;
if is_connected {
self.reconnect().await?;
}
}
Ok(())
}
pub async fn clear(&self) -> Result<(), AppError> {
let mut epics = self.epics.write().await;
let count = epics.len();
epics.clear();
info!("Cleared {} EPICs from subscription list", count);
Ok(())
}
pub async fn get_epics(&self) -> Vec<String> {
let epics = self.epics.read().await;
epics.iter().cloned().collect()
}
pub async fn get_receiver(&self) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
let mut rx_lock = self.price_rx.write().await;
rx_lock
.take()
.ok_or_else(|| AppError::InvalidInput("Receiver already taken".to_string()))
}
async fn reconnect(&self) -> Result<(), AppError> {
info!("Reconnecting with updated EPIC list...");
{
let shutdown_lock = self.shutdown_signal.read().await;
if let Some(signal) = shutdown_lock.as_ref() {
signal.notify_one();
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let epics = self.get_epics().await;
if !epics.is_empty() {
self.start_internal().await?;
}
Ok(())
}
async fn start_internal(&self) -> Result<(), AppError> {
let epics = self.get_epics().await;
if epics.is_empty() {
warn!("No EPICs to subscribe to");
return Ok(());
}
info!("Starting connection with {} EPICs", epics.len());
let mut new_client = StreamerClient::new().await?;
let fields = self.fields.clone();
let mut receiver = new_client.market_subscribe(epics.clone(), fields).await?;
let price_tx = self.price_tx.read().await;
if let Some(tx) = price_tx.as_ref() {
let tx = tx.clone();
tokio::spawn(async move {
while let Some(price_data) = receiver.recv().await {
if tx.send(price_data).is_err() {
warn!("Failed to send price update: receiver dropped");
break;
}
}
debug!("Subscription forwarding task ended");
});
}
*self.client.write().await = Some(new_client);
let signal = Arc::new(Notify::new());
*self.shutdown_signal.write().await = Some(Arc::clone(&signal));
*self.is_connected.write().await = true;
let client = Arc::clone(&self.client);
let is_connected = Arc::clone(&self.is_connected);
tokio::spawn(async move {
let result = {
let mut client_guard = client.write().await;
if let Some(ref mut c) = *client_guard {
c.connect(Some(signal)).await
} else {
Ok(())
}
};
*is_connected.write().await = false;
match result {
Ok(_) => info!("Connection task completed successfully"),
Err(e) => tracing::error!("Connection task failed: {:?}", e),
}
});
info!("Connection task started in background");
Ok(())
}
pub async fn start(&mut self) -> Result<(), AppError> {
self.start_internal().await
}
pub async fn connect(&mut self) -> Result<(), AppError> {
self.start().await?;
use lightstreamer_rs::utils::setup_signal_hook;
let signal = Arc::new(Notify::new());
setup_signal_hook(Arc::clone(&signal)).await;
signal.notified().await;
self.disconnect().await?;
Ok(())
}
pub async fn disconnect(&mut self) -> Result<(), AppError> {
{
let shutdown_lock = self.shutdown_signal.read().await;
if let Some(signal) = shutdown_lock.as_ref() {
signal.notify_one();
}
}
let mut client_lock = self.client.write().await;
if let Some(ref mut client) = *client_lock {
client.disconnect().await?;
}
*client_lock = None;
*self.is_connected.write().await = false;
info!("Disconnected from Lightstreamer server");
Ok(())
}
}
impl Clone for DynamicMarketStreamer {
fn clone(&self) -> Self {
Self {
client: Arc::clone(&self.client),
epics: Arc::clone(&self.epics),
fields: self.fields.clone(),
price_tx: Arc::clone(&self.price_tx),
price_rx: Arc::clone(&self.price_rx),
is_connected: Arc::clone(&self.is_connected),
shutdown_signal: Arc::clone(&self.shutdown_signal),
}
}
}