use crate::helpers::{short_uuid, Backoff};
use crate::tekscope::new_connect_client;
use smol_str::SmolStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio::sync::{broadcast, watch};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, warn};
use crate::acquisition::{AcquisitionConfig, AcquisitionRunner};
use crate::client::options::DEFAULT_CAPACITY;
use crate::data::Acquisition;
use crate::errors::{ConnectionError, SubscriptionError, SubscriptionUpdateError, TekHsiError};
use crate::tekscope::{ConnectRequest, ConnectStatus};
use crate::SubscribeOptions;
struct SubscriptionHandle {
shutdown: CancellationToken,
update_tx: watch::Sender<Vec<String>>,
}
pub struct TekHsiClient {
client_name: SmolStr,
channel: tonic::transport::Channel,
disconnect_token: AtomicBool,
subscription: Mutex<Option<SubscriptionHandle>>,
}
impl TekHsiClient {
fn take_subscription(&self) -> Option<SubscriptionHandle> {
self.subscription.lock().ok()?.take()
}
pub async fn connect(addr: &str) -> Result<TekHsiClient, TekHsiError> {
let addr = if addr.contains("://") {
addr.to_string()
} else {
format!("http://{addr}")
};
debug!("connecting to {}", addr);
let channel = tonic::transport::Endpoint::from_shared(addr)?
.connect()
.await?;
debug!("transport channel created");
let client_name: SmolStr = short_uuid().into();
let mut backoff = Backoff::new(5, 250, 10);
let mut connect = new_connect_client(channel.clone(), &client_name);
debug!("connect client created");
loop {
let reply = connect
.connect(ConnectRequest {
name: client_name.to_string(),
})
.await?
.into_inner();
debug!("connect reply status: {:?}", reply.status());
if reply.status() == ConnectStatus::ConnectstatusSuccess {
break;
}
let connection_error: ConnectionError = reply.status().into();
match connection_error {
ConnectionError::Timeout | ConnectionError::ScopeBusy => {
if !backoff.sleep_next().await {
error!(
"final connection attempt failed with status: {:?}",
connection_error
);
return Err(TekHsiError::Connection(connection_error));
}
warn!(
"connection attempt failed with status: {:?}, retrying...",
connection_error
);
}
ConnectionError::Fatal { .. } | ConnectionError::LostSync => {
error!(
"connection attempt failed with status: {:?}",
connection_error
);
return Err(TekHsiError::Connection(connection_error));
}
}
}
Ok(Self {
client_name,
channel,
disconnect_token: AtomicBool::new(false),
subscription: Mutex::new(None),
})
}
pub async fn disconnect(&self) -> Result<(), TekHsiError> {
self.disconnect_token.store(true, Ordering::SeqCst);
debug!("disconnecting");
if let Some(handle) = self.take_subscription() {
handle.shutdown.cancel();
sleep(Duration::from_millis(500)).await;
}
let mut connect = new_connect_client(self.channel.clone(), &self.client_name);
let _ = connect
.disconnect(ConnectRequest {
name: self.client_name.to_string(),
})
.await?;
Ok(())
}
pub fn subscribe(
&self,
mut active_symbols: Vec<String>,
opts: SubscribeOptions,
) -> Result<Receiver<Acquisition>, TekHsiError> {
if active_symbols.is_empty() {
return Err(SubscriptionError::EmptySymbols.into());
}
active_symbols
.iter_mut()
.for_each(|s| s.make_ascii_lowercase());
active_symbols.dedup();
let mut subscription = self
.subscription
.lock()
.map_err(|_| SubscriptionError::Fatal)?;
if let Some(handle) = subscription.as_ref() {
if !handle.shutdown.is_cancelled() && !handle.update_tx.is_closed() {
return Err(SubscriptionError::AlreadyActive.into());
}
subscription.take();
}
let capacity = if opts.capacity == 0 {
DEFAULT_CAPACITY
} else {
opts.capacity
};
debug!(
"starting acquisition loop for symbols: {:?}",
active_symbols
);
let shutdown = CancellationToken::new();
let (sender, receiver) = broadcast::channel(capacity);
let (update_tx, update_rx) = watch::channel(active_symbols.clone());
let config = AcquisitionConfig {
client_name: self.client_name.clone(),
active_symbols,
download_chunk_size: opts.download_chunk_size,
decode_batch_buffer: opts.decode_buffer_capacity,
};
AcquisitionRunner::new(
self.channel.clone(),
config,
sender,
shutdown.clone(),
update_rx,
)
.spawn();
*subscription = Some(SubscriptionHandle {
shutdown,
update_tx,
});
Ok(receiver)
}
pub fn update_symbols(&self, mut active_symbols: Vec<String>) -> Result<(), TekHsiError> {
if active_symbols.is_empty() {
return Err(SubscriptionUpdateError::EmptySymbols.into());
}
active_symbols
.iter_mut()
.for_each(|s| s.make_ascii_lowercase());
active_symbols.dedup();
let mut subscription = self
.subscription
.lock()
.map_err(|_| SubscriptionUpdateError::Fatal)?;
let Some(handle) = subscription.as_ref() else {
return Err(SubscriptionUpdateError::NotActive.into());
};
if handle.shutdown.is_cancelled() || handle.update_tx.is_closed() {
*subscription = None;
return Err(SubscriptionUpdateError::NotActive.into());
}
debug!("updating symbols for subscription: {:?}", active_symbols);
if handle.update_tx.send(active_symbols).is_err() {
*subscription = None;
return Err(SubscriptionUpdateError::UpdateFailed.into());
}
Ok(())
}
pub async fn list_available_symbols(&self) -> Result<Vec<String>, TekHsiError> {
let mut connect = new_connect_client(self.channel.clone(), &self.client_name);
let reply = connect
.request_available_names(ConnectRequest {
name: self.client_name.to_string(),
})
.await?
.into_inner();
if reply.status() != ConnectStatus::ConnectstatusSuccess {
return Err(TekHsiError::Connection(ConnectionError::from(
reply.status(),
)));
}
let mut symbols = reply.symbolnames;
symbols.iter_mut().for_each(|s| s.make_ascii_lowercase());
Ok(symbols)
}
}
impl Drop for TekHsiClient {
fn drop(&mut self) {
if self.disconnect_token.swap(true, Ordering::SeqCst) {
return;
}
if let Some(handle) = self.take_subscription() {
handle.shutdown.cancel();
}
let name = self.client_name.clone();
let channel = self.channel.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
if let Err(err) = new_connect_client(channel, &name)
.disconnect(ConnectRequest {
name: name.to_string(),
})
.await
{
warn!("disconnect failed: {}", err);
}
});
}
}
}